ti-powerbar/app/threads.py

39 lines
1.4 KiB
Python
Raw Normal View History

2024-02-28 19:10:53 +00:00
import threading
from app.settings import powerbars
import time
from app.utils import run_telnet_command
from queue import Queue
queues = {}
def baytech_batch_update(powerbar, powerbars, queue):
while True:
time.sleep(1)
print(f"Running Sync Job: {powerbar}")
print(f"Queue Size: {queue.qsize()}")
if queue.qsize() > 0:
print(f"Queue Size: {queue.qsize()}")
try:
job = queue.get()
print(f"Job: {job}")
outlet = job['outlet']
action = job['action']
print(f"Turning {action} powerbar {powerbar} outlet {outlet}")
command = run_telnet_command(powerbar,f"{action} {outlet}")
if command:
powerbars[powerbar]['outlets'][outlet]['state'] = action
print(f"Turned {action} powerbar {powerbar} outlet {outlet}")
except Exception as E:
print(f"Job Exception Error: {E}")
pass
def generate_threads():
for powerbar in powerbars:
if powerbars[powerbar]['method'] == 'batch_telnet':
if powerbars[powerbar]['type'] == 'baytech':
queues[powerbar] = Queue(maxsize=100)
t = threading.Thread(target=baytech_batch_update, args=[powerbar, powerbars, queues[powerbar]])
t.start()