telnet
All checks were successful
Deploy powerbar.ti Frontend Portal / Deploy-Tinance2-Frontend-Portal-Production (push) Successful in 37s
All checks were successful
Deploy powerbar.ti Frontend Portal / Deploy-Tinance2-Frontend-Portal-Production (push) Successful in 37s
This commit is contained in:
parent
40b31110e4
commit
701c5d90cd
5 changed files with 58 additions and 43 deletions
|
@ -26,7 +26,7 @@ def create_app():
|
||||||
try:
|
try:
|
||||||
print("Is a baytech powerbar, getting outlet status")
|
print("Is a baytech powerbar, getting outlet status")
|
||||||
if powerbar_info['method'] == "telnet":
|
if powerbar_info['method'] == "telnet":
|
||||||
outlets = get_baytech_status_outlet_telnet(powerbar_info['host'], powerbar_info['port'])
|
outlets = get_baytech_status_outlet_telnet(powerbar_name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for outlet_id, outlet_info in powerbar_info['outlets'].items():
|
for outlet_id, outlet_info in powerbar_info['outlets'].items():
|
||||||
|
|
|
@ -18,7 +18,9 @@ powerbars = {
|
||||||
"powerbar-aux-space" : {
|
"powerbar-aux-space" : {
|
||||||
"name" : "Powerbar Aux Space",
|
"name" : "Powerbar Aux Space",
|
||||||
"host" : "10.209.10.111",
|
"host" : "10.209.10.111",
|
||||||
"port" : 3696,
|
"username" : "admin",
|
||||||
|
"password" : "admin",
|
||||||
|
"port" : 2167,
|
||||||
"type" : "baytech",
|
"type" : "baytech",
|
||||||
"method" : "telnet",
|
"method" : "telnet",
|
||||||
"description" : "Powerbar Controlling Aux Space",
|
"description" : "Powerbar Controlling Aux Space",
|
||||||
|
@ -39,7 +41,9 @@ powerbars = {
|
||||||
"powerbar-main-space" : {
|
"powerbar-main-space" : {
|
||||||
"name" : "Powerbar Main Space",
|
"name" : "Powerbar Main Space",
|
||||||
"host" : "10.209.10.111",
|
"host" : "10.209.10.111",
|
||||||
"port" : 3697,
|
"username" : "admin",
|
||||||
|
"password" : "admin",
|
||||||
|
"port" : 2168,
|
||||||
"type" : "baytech",
|
"type" : "baytech",
|
||||||
"method" : "telnet",
|
"method" : "telnet",
|
||||||
"description" : "Powerbar Controlling Main Space",
|
"description" : "Powerbar Controlling Main Space",
|
||||||
|
|
40
app/utils.py
40
app/utils.py
|
@ -1,15 +1,47 @@
|
||||||
import telnetlib
|
import telnetlib
|
||||||
import re
|
import re
|
||||||
|
from app.settings import powerbars, groups
|
||||||
|
|
||||||
def get_baytech_status_outlet_telnet(hostname,port):
|
def check_input_error(tn):
|
||||||
|
tn.write("/\r\n".encode('ascii'))
|
||||||
|
for _ in range(3):
|
||||||
|
print(f"Checking for input error attempt ${_}")
|
||||||
|
output = tn.read_until(b"Input error", timeout=3).decode('ascii')
|
||||||
|
if "Input error" in output:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_telnet_connection(powerbar):
|
||||||
|
tn = telnetlib.Telnet(powerbars[powerbar]['host'], powerbars[powerbar]['port'])
|
||||||
|
return tn
|
||||||
|
|
||||||
|
|
||||||
|
def run_telnet_command(powerbar, command):
|
||||||
|
try:
|
||||||
|
tn = get_telnet_connection(powerbar)
|
||||||
|
|
||||||
|
if check_input_error(tn):
|
||||||
|
tn.write(f"{command}\r\n".encode('ascii'))
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Telnet error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_baytech_status_outlet_telnet(powerbar):
|
||||||
try:
|
try:
|
||||||
# Create a Telnet object and connect to the host
|
# Create a Telnet object and connect to the host
|
||||||
tn = telnetlib.Telnet(hostname, port)
|
tn = telnetlib.Telnet(powerbars[powerbar]['host'], powerbars[powerbar]['port'])
|
||||||
|
|
||||||
# Send the command you want to execute
|
# Send the command you want to execute
|
||||||
command = "status"
|
command = "status"
|
||||||
tn.write(command.encode('ascii') + b"\r\n")
|
|
||||||
|
|
||||||
|
if check_input_error(tn):
|
||||||
|
tn.write(command.encode('ascii') + b"\r\n")
|
||||||
|
else:
|
||||||
|
return None
|
||||||
# Read the output until you receive the prompt or a timeout occurs
|
# Read the output until you receive the prompt or a timeout occurs
|
||||||
output = tn.read_until(b"<prompt>", timeout=5).decode('ascii')
|
output = tn.read_until(b"<prompt>", timeout=5).decode('ascii')
|
||||||
# Close the Telnet connection
|
# Close the Telnet connection
|
||||||
|
|
37
app/views.py
37
app/views.py
|
@ -1,5 +1,6 @@
|
||||||
from flask import render_template, Blueprint, request
|
from flask import render_template, Blueprint, request
|
||||||
from app.settings import powerbars, groups
|
from app.settings import powerbars, groups
|
||||||
|
from app.utils import run_telnet_command
|
||||||
from flask import jsonify
|
from flask import jsonify
|
||||||
import telnetlib
|
import telnetlib
|
||||||
import time
|
import time
|
||||||
|
@ -10,42 +11,6 @@ routes = Blueprint('routes', __name__)
|
||||||
# Create a dictionary to store the active telnet connections
|
# Create a dictionary to store the active telnet connections
|
||||||
active_telnet_connections = {}
|
active_telnet_connections = {}
|
||||||
|
|
||||||
def get_telnet_connection(powerbar):
|
|
||||||
"""
|
|
||||||
Get the active telnet connection for the given powerbar.
|
|
||||||
If the connection does not exist, create a new one.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
powerbar (str): The name of the powerbar.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
telnetlib.Telnet: The telnet connection object.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if powerbar in active_telnet_connections:
|
|
||||||
return active_telnet_connections[powerbar]
|
|
||||||
else:
|
|
||||||
tn = telnetlib.Telnet(powerbars[powerbar]['host'], powerbars[powerbar]['port'])
|
|
||||||
active_telnet_connections[powerbar] = tn
|
|
||||||
return tn
|
|
||||||
|
|
||||||
|
|
||||||
def run_telnet_command(powerbar, command):
|
|
||||||
try:
|
|
||||||
tn = get_telnet_connection(powerbar)
|
|
||||||
tn.write(f"{command}\r\n".encode('ascii'))
|
|
||||||
time.sleep(0.2)
|
|
||||||
tn.write(f"{command}\r\n".encode('ascii'))
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Telnet error: {e}")
|
|
||||||
if powerbar in active_telnet_connections:
|
|
||||||
active_telnet_connections.pop(powerbar)
|
|
||||||
tn = get_telnet_connection(powerbar)
|
|
||||||
tn.write(f"{command}\r\n".encode('ascii'))
|
|
||||||
time.sleep(0.2)
|
|
||||||
tn.write(f"{command}\r\n".encode('ascii'))
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def find_group_by_name(name):
|
def find_group_by_name(name):
|
||||||
|
|
14
telnet.py
Normal file
14
telnet.py
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
import telnetlib
|
||||||
|
|
||||||
|
def check_input_error():
|
||||||
|
tn = telnetlib.Telnet("10.209.10.111", 2168)
|
||||||
|
tn.write("/\r\n".encode('ascii'))
|
||||||
|
for _ in range(3):
|
||||||
|
output = tn.read_until(b"Input error", timeout=3).decode('ascii')
|
||||||
|
print(output)
|
||||||
|
if "Input error" in output:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
result = check_input_error()
|
||||||
|
print(result)
|
Loading…
Reference in a new issue