from flask import render_template, Blueprint, request from app.settings import powerbars, groups from app.utils import run_telnet_command from flask import jsonify import time import app from app.threads import queues routes = Blueprint('routes', __name__) # Create a dictionary to store the active telnet connections active_telnet_connections = {} def find_group_by_name(name): for group in groups: if group["name"] == name: return group return None def vaild_power_bar(powerbar): if powerbar in powerbars: return True else: return False def vaild_outlet(powerbar, outlet): if outlet in powerbars[powerbar]['outlets']: return True else: return False @routes.route('/') def home(): for powerbar in powerbars: for outlet in powerbars[powerbar]['outlets']: powerbars[powerbar]['outlets'][outlet]['state'] = powerbars[powerbar]['outlets'][outlet].get('state', 'unknown') return render_template('index.html', powerbars=powerbars, all_groups=groups) @routes.route('///') def powerbar_control(powerbar, outlet, action): """ Controls the powerbar outlet based on the given parameters. Args: powerbar (str): The name of the powerbar. outlet (int): The outlet number. action (str): The action to perform on the outlet ('on', 'off', 'cycle'). Returns: dict: A JSON response containing the state of the action or an error message. Raises: ValueError: If the action is not valid. ValueError: If the powerbar is not valid. ValueError: If the outlet is not valid. Exception: If there is a telnet error. """ print(f"powerbar: {powerbar}") print(f"outlet: {outlet}") print(f"action: {action}") if not action in ['on', 'off', 'cycle', 'toggle']: print("Invalid action") return jsonify({'error': 'Invalid action'}), 400 if not vaild_power_bar(powerbar): print("Invalid powerbar") return jsonify({'error': 'Invalid powerbar'}), 400 if not vaild_outlet(powerbar, outlet): print("Invalid outlet") return jsonify({'error': 'Invalid outlet'}), 400 if "password" in powerbars[powerbar]['outlets'][outlet].keys(): user_password = request.args.get('password') outlet_password = powerbars[powerbar]['outlets'][outlet]["password"] if user_password == None: print("Password missing") return jsonify({'error': 'Password missing'}), 400 if user_password != outlet_password: return jsonify({'error': 'Password incorrect'}), 400 if action in ['on', 'off']: try: print(f"Turning {action} powerbar {powerbar} outlet {outlet}") if powerbars[powerbar]['method'] == "telnet": command = run_telnet_command(powerbar,f"{action} {outlet}") if powerbars[powerbar]['method'] == "batch_telnet": queues[powerbar].put({"outlet": outlet, "action" : action}) command = True if command: powerbars[powerbar]['outlets'][outlet]['state'] = action app.socketio.emit('power-event',{'powerbar': powerbar, 'outlet' : outlet, 'action' : action}, namespace='/powerupdates') print(f"Turned {action} powerbar {powerbar} outlet {outlet}") except Exception as E: print(f"Endpoint Exception Error: {E}") return jsonify({'error': 'Endpoint Exception Error'}), 500 if action == 'cycle': print(f"Turning Off powerbar {powerbar} outlet {outlet}") if powerbars[powerbar]['method'] == "telnet": command = run_telnet_command(powerbar,f"off {outlet}") if powerbars[powerbar]['method'] == "batch_telnet": queues[powerbar].put({"outlet": outlet, "action" : "off"}) command = True if command: powerbars[powerbar]['outlets'][outlet]['state'] = "off" app.socketio.emit('power-event',{'powerbar': powerbar, 'outlet' : outlet, 'action' : action}, namespace='/powerupdates') time.sleep(5) print(f"Turning On powerbar {powerbar} outlet {outlet}") if powerbars[powerbar]['method'] == "telnet": command = run_telnet_command(powerbar,f"on {outlet}") if powerbars[powerbar]['method'] == "batch_telnet": queues[powerbar].put({"outlet": outlet, "action" : "on"}) command = True if command: powerbars[powerbar]['outlets'][outlet]['state'] = "on" if action == 'toggle': print(powerbars[powerbar]['outlets'][outlet]['state']) if powerbars[powerbar]['outlets'][outlet]['state'] == "on": print(f"Turning Off powerbar {powerbar} outlet {outlet}") if powerbars[powerbar]['method'] == "telnet": command = run_telnet_command(powerbar,f"off {outlet}") if powerbars[powerbar]['method'] == "batch_telnet": queues[powerbar].put({"outlet": outlet, "action" : "off"}) command = True if command: powerbars[powerbar]['outlets'][outlet]['state'] = "off" elif powerbars[powerbar]['outlets'][outlet]['state'] == "off": print(f"Turning On powerbar {powerbar} outlet {outlet}") if powerbars[powerbar]['method'] == "telnet": command = run_telnet_command(powerbar,f"on {outlet}") if powerbars[powerbar]['method'] == "batch_telnet": queues[powerbar].put({"outlet": outlet, "action" : "on"}) command = True if command: powerbars[powerbar]['outlets'][outlet]['state'] = "on" state = powerbars[powerbar]['outlets'][outlet].get('state', 'unknown') app.socketio.emit('power-event',{'powerbar': powerbar, 'outlet' : outlet, 'action' : state}, namespace='/powerupdates') return jsonify({'state': powerbars[powerbar]['outlets'][outlet].get('state', 'unknown')}) @routes.route('///state') def powerbar_state(powerbar, outlet): print(f"powerbar: {powerbar}") print(f"outlet: {outlet}") if not vaild_power_bar(powerbar): print("Invalid powerbar") return jsonify({'error': 'Invalid powerbar'}), 400 if not vaild_outlet(powerbar, outlet): print("Invalid outlet") return jsonify({'error': 'Invalid outlet'}), 400 state = powerbars[powerbar]['outlets'][outlet].get('state', 'unknown') return jsonify({'state': state}) @routes.route('/powerbars') def powerbars_list(): cleaned_powerbars = {} for powerbar in powerbars: cleaned_powerbar = { "name" : powerbars[powerbar]['name'], "description" : powerbars[powerbar]['description'], "host" : powerbars[powerbar]['host'], "port" : powerbars[powerbar]['port'], "type" : powerbars[powerbar]['type'], "outlets" : {}, } for outlet in powerbars[powerbar]['outlets']: cleaned_outlet = {} state = powerbars[powerbar]['outlets'][outlet].get('state', 'unknown') cleaned_outlet['name'] = powerbars[powerbar]['outlets'][outlet]['name'] cleaned_outlet['state'] = state cleaned_outlet['on_url'] = f"/{powerbar}/{outlet}/on" cleaned_outlet['off_url'] = f"/{powerbar}/{outlet}/off" cleaned_outlet['state_url'] = f"/{powerbar}/{outlet}/state" cleaned_outlet['cycle_url'] = f"/{powerbar}/{outlet}/cycle" cleaned_outlet['toggle_url'] = f"/{powerbar}/{outlet}/toggle" cleaned_outlet['password_protected'] = True if 'password' in powerbars[powerbar]['outlets'][outlet].keys() else False cleaned_powerbar['outlets'][outlet] = cleaned_outlet cleaned_powerbars[powerbar] = cleaned_powerbar return jsonify(cleaned_powerbars) @routes.route('/groups//') def powebar_group_action(group, action): if not action in ['on', 'off', 'cycle']: print("Invalid action") return jsonify({'error': 'Invalid action'}), 400 group = find_group_by_name(group) if not group: print("Invalid group") return jsonify({'error': 'Invalid group'}), 400 if action in ['on', 'off']: try: for device in group["devices"]: powerbar = device["powerbar"] print(f"Powerbar: {powerbar}") outlets = ",".join([str(x) for x in device['outlets']]) print(f"Turning {action} powerbar {powerbar} outlet {outlets}") if powerbars[powerbar]['method'] == "telnet": command = run_telnet_command(powerbar,f"{action} {outlets}") if powerbars[powerbar]['method'] == "batch_telnet": for outlet in device['outlets']: queues[powerbar].put({"outlet": outlet, "action" : action}) command = True if command: for outlet in device['outlets']: if outlet in powerbars[powerbar]['outlets']: powerbars[powerbar]['outlets'][outlet]['state'] = action app.socketio.emit('power-event',{'powerbar': powerbar, 'outlet' : outlet, 'action' : action}, namespace='/powerupdates') print(f"Turned {action} powerbar {powerbar} outlet {outlet}") return jsonify({'status': "200"}) return jsonify({'status': "500"}) except Exception as E: print(f"Endpoint Exception Error: {E}") return jsonify({'error': 'Endpoint Exception Error'}), 500 @routes.route('/groups') def powebar_groups(): try: for group in groups: group['on_url'] = f"/groups/{group['name']}/on" group['off_url'] = f"/groups/{group['name']}/off" return jsonify(groups) except Exception as E: print(f"Telnet error: {E}") return jsonify({'error': 'no groups defined for powerbar'}), 404