import telnetlib import re from app.settings import powerbars, groups import time retries = 5 def check_input_error(tn): tn.write("/\r\n".encode('ascii')) for _ in range(3): print(f"Checking for input error attempt {_}") output = tn.read_until(b"Input error", timeout=3).decode('ascii') if "Input error" in output: return True return False def get_telnet_connection(powerbar): tn = telnetlib.Telnet(powerbars[powerbar]['host'], powerbars[powerbar]['port']) return tn def run_telnet_command(powerbar, command): for _ in range(retries): try: tn = get_telnet_connection(powerbar) tn.write(f"{command}\r\n".encode('ascii')) time.sleep(1) tn.write(f"{command}\r\n".encode('ascii')) tn.close() return True except Exception as e: print(f"Run Telnet Command Error: {e}") time.sleep(1) # Wait for 1 second before retrying pass return False def get_baytech_status_outlet_telnet(powerbar): for _ in range(retries): try: # Create a Telnet object and connect to the host tn = telnetlib.Telnet(powerbars[powerbar]['host'], powerbars[powerbar]['port']) # Send the command you want to execute command = "status" tn.write(command.encode('ascii') + b"\r\n") # Read the output until you receive the prompt or a timeout occurs output = tn.read_until(b"", timeout=5).decode('ascii') # Close the Telnet connection tn.close() pattern = r"(\d+)\)\.\.\.Outlet\s+(\d+)\s+:\s+(\w+)" outlets = {} matches = re.findall(pattern, output) for match in matches: number = int(match[1]) status = match[2] outlets[number] = status return outlets except: pass return None