powebar polling
All checks were successful
Deploy powerbar.ti Frontend Portal / Deploy-Tinance2-Frontend-Portal-Production (push) Successful in 39s

This commit is contained in:
Matthew Frost 2024-02-07 18:18:33 +01:00
parent 16c6447aee
commit ac0e14fd6b
6 changed files with 55 additions and 15 deletions

View file

@ -5,27 +5,23 @@ from app.settings import powerbars
from app.utils import get_baytech_status_outlet_telnet from app.utils import get_baytech_status_outlet_telnet
from flask_socketio import SocketIO, emit from flask_socketio import SocketIO, emit
from gevent import monkey from gevent import monkey
from app.jobs import scheduler
socketio = SocketIO(async_mode="gevent", logger=True, engineio_logger=True, cors_allowed_origins="*") socketio = SocketIO(async_mode="gevent", logger=True, engineio_logger=True, cors_allowed_origins="*")
@socketio.on('connect', namespace='/powerupdates') @socketio.on('connect', namespace='/powerupdates')
def connect_handler(): def connect_handler():
print('connected') print('connected')
emit('response', {'meta': 'WS connected'}) emit('response', {'meta': 'WS connected'})
def create_app(): def powerbar_status():
# create and configure the app
app = Flask(__name__)
app.config.from_pyfile('settings.py')
app.register_blueprint(routes)
for powerbar_name, powerbar_info in powerbars.items(): for powerbar_name, powerbar_info in powerbars.items():
print(f"Checking powerbar state {powerbar_name}") print(f"Checking powerbar state {powerbar_name}")
if powerbar_info['type'] == 'baytech': if powerbar_info['type'] == 'baytech':
try: try:
print("Is a baytech powerbar, getting outlet status") print("Is a baytech powerbar, getting outlet status")
if powerbar_info['method'] == "telnet": if powerbar_info['method'] in ["telnet", "batch_telnet"]:
outlets = get_baytech_status_outlet_telnet(powerbar_name) outlets = get_baytech_status_outlet_telnet(powerbar_name)
try: try:
@ -38,10 +34,20 @@ def create_app():
except: except:
print(f"Failed to get outlet status from powerbar {powerbar_name}") print(f"Failed to get outlet status from powerbar {powerbar_name}")
continue continue
return
def create_app():
# create and configure the app
app = Flask(__name__)
app.config.from_pyfile('settings.py')
app.register_blueprint(routes)
powerbar_status()
scheduler.init_app(app)
socketio.init_app(app) socketio.init_app(app)
return app return app
app = create_app() app = create_app()
scheduler.start()
@app.errorhandler(403) @app.errorhandler(403)
def not_authorised(e): def not_authorised(e):
@ -53,3 +59,4 @@ def internal_server_error(e):
if not app.config["DEBUG"]: if not app.config["DEBUG"]:
monkey.patch_all() monkey.patch_all()

35
app/jobs.py Normal file
View file

@ -0,0 +1,35 @@
from flask_apscheduler import APScheduler
from app.settings import powerbars
from app.utils import run_telnet_command
scheduler = APScheduler()
@scheduler.task('interval', id='do_periodic_serial_job', seconds=3)
def periodic_serial_job():
print("Running Sync Job")
for powerbar in powerbars:
on_command = "On "
off_command = "Off "
print(f"Checking powerbar state {powerbar}")
for outlet in powerbars[powerbar]['outlets']:
outlet_status = powerbars[powerbar]['outlets'][outlet].get('state', 'unknown')
if outlet_status is not "unknown":
if outlet_status == "on":
on_command += f"{outlet},"
if outlet_status == "off":
off_command += f"{outlet},"
print(f"Turning on outlets: {on_command.rstrip(',')}")
if on_command is not "On ":
print("Running On command")
# run_telnet_command(powerbar, on_command.rstrip(','))
print(f"Turning off outlets: {off_command.rstrip(',')}")
if off_command is not "Off ":
print("Running Off command")
# run_telnet_command(powerbar, off_command.rstrip(','))

View file

@ -22,7 +22,7 @@ powerbars = {
"password" : "admin", "password" : "admin",
"port" : 2167, "port" : 2167,
"type" : "baytech", "type" : "baytech",
"method" : "telnet", "method" : "batch_telnet",
"description" : "Powerbar Controlling Aux Space", "description" : "Powerbar Controlling Aux Space",
"outlets" : { "outlets" : {
4 : { "name" : "Power solderlane" }, 4 : { "name" : "Power solderlane" },
@ -45,7 +45,7 @@ powerbars = {
"password" : "admin", "password" : "admin",
"port" : 2168, "port" : 2168,
"type" : "baytech", "type" : "baytech",
"method" : "telnet", "method" : "batch_telnet",
"description" : "Powerbar Controlling Main Space", "description" : "Powerbar Controlling Main Space",
"outlets" : { "outlets" : {
9 : { "name" : "3D Printers" }, 9 : { "name" : "3D Printers" },

File diff suppressed because one or more lines are too long

View file

@ -2,7 +2,6 @@ from flask import render_template, Blueprint, request
from app.settings import powerbars, groups from app.settings import powerbars, groups
from app.utils import run_telnet_command from app.utils import run_telnet_command
from flask import jsonify from flask import jsonify
import telnetlib
import time import time
import app import app
@ -11,8 +10,6 @@ routes = Blueprint('routes', __name__)
# Create a dictionary to store the active telnet connections # Create a dictionary to store the active telnet connections
active_telnet_connections = {} active_telnet_connections = {}
def find_group_by_name(name): def find_group_by_name(name):
for group in groups: for group in groups:
if group["name"] == name: if group["name"] == name:
@ -25,7 +22,7 @@ def vaild_power_bar(powerbar):
else: else:
return False return False
def vaild_outlet (powerbar, outlet): def vaild_outlet(powerbar, outlet):
if outlet in powerbars[powerbar]['outlets']: if outlet in powerbars[powerbar]['outlets']:
return True return True
else: else:

View file

@ -2,4 +2,5 @@ flask
telnetlib3 telnetlib3
flask-socketio flask-socketio
gunicorn gunicorn
gevent gevent
Flask-APScheduler