168 lines
6.2 KiB
Python
168 lines
6.2 KiB
Python
import serial
|
|
import requests
|
|
import binascii
|
|
import threading
|
|
import time
|
|
from requests.exceptions import RequestException, ConnectionError
|
|
|
|
# Constants
|
|
SERIAL_PORT = '/dev/tty.usbserial-0001'
|
|
BAUD_RATE = 9600
|
|
TINANCE2_READER_IDENTIFIER = "CNC-READER-1"
|
|
TINANCE2_READER_ACCESS_KEY = "4f95166a-8a15-4963-bad8-35c7047b4269"
|
|
API_BASE_URL = 'http://localhost:8000/accesscontrol/api/'
|
|
|
|
# Reader configuration
|
|
reader_info = {
|
|
"enabled": False,
|
|
"mode": None
|
|
}
|
|
|
|
# Headers for API requests
|
|
HEADERS = {
|
|
"Content-Type": "application/json",
|
|
"X-Identifier": TINANCE2_READER_IDENTIFIER,
|
|
"X-Access-Key": TINANCE2_READER_ACCESS_KEY
|
|
}
|
|
|
|
# Define a maximum number of retry attempts and a delay between retries
|
|
MAX_RETRY_ATTEMPTS = 5 # Changed to 5
|
|
RETRY_DELAY_SECONDS = 10
|
|
|
|
def handle_log_msg(prefix, value):
|
|
print(f"Received LOG_MSG: {value}")
|
|
|
|
def send_http_request_with_retry(url, data, headers):
|
|
retry_count = 0
|
|
|
|
while retry_count < MAX_RETRY_ATTEMPTS:
|
|
try:
|
|
response = requests.post(url, json=data, headers=headers)
|
|
if response.status_code == 200:
|
|
print(f"Sent FULL_CARD_ID: {data['full_card_id']} via HTTP POST successfully")
|
|
if reader_info.get("mode") == "LATCH":
|
|
ser.write("unlock".encode('utf-8'))
|
|
elif reader_info.get("mode") == "TOGGLE":
|
|
ser.write("toggle".encode('utf-8'))
|
|
break # Successful request, no need to retry
|
|
elif response.status_code in [400, 401, 403, 500]:
|
|
# Don't retry on certain errors
|
|
print(f"HTTP POST request failed with status code {response.status_code}. Not retrying.")
|
|
break
|
|
else:
|
|
print(f"Failed to send FULL_CARD_ID via HTTP POST. Status code: {response.status_code}")
|
|
print(response.text)
|
|
except (RequestException, ConnectionError) as e:
|
|
print(f"HTTP request failed with exception: {e}")
|
|
retry_count += 1
|
|
if retry_count < MAX_RETRY_ATTEMPTS:
|
|
print(f"Retrying in {RETRY_DELAY_SECONDS} seconds...")
|
|
time.sleep(RETRY_DELAY_SECONDS)
|
|
else:
|
|
print("Max retry attempts reached. Request failed.")
|
|
break
|
|
|
|
def check_valid_card(prefix, value):
|
|
if value == "0:0":
|
|
print("Received invalid card ID (0:0), ignoring...")
|
|
return
|
|
|
|
url = f'{API_BASE_URL}check-card-id'
|
|
data_to_post = {"full_card_id": value}
|
|
|
|
send_http_request_with_retry(url, data_to_post, HEADERS)
|
|
|
|
def fetch_initial_reader_config():
|
|
try:
|
|
url = f'{API_BASE_URL}readerinfo'
|
|
response = requests.get(url, headers=HEADERS)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
reader_info["enabled"] = data.get("enabled", False)
|
|
reader_info["mode"] = data.get("mode")
|
|
except (RequestException, ConnectionError) as e:
|
|
print(f"Error fetching initial reader config: {e}")
|
|
|
|
def reader_config_check():
|
|
while True:
|
|
try:
|
|
print("Checking HTTP endpoint for new reader config...")
|
|
url = f'{API_BASE_URL}readerinfo'
|
|
response = requests.get(url, headers=HEADERS)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
new_enabled = data.get("enabled", False)
|
|
new_mode = data.get("mode")
|
|
|
|
if new_enabled != reader_info["enabled"]:
|
|
print("New reader enabled status: ", new_enabled)
|
|
if new_enabled:
|
|
print("Enabling reader...")
|
|
ser.write("enable".encode('utf-8'))
|
|
else:
|
|
print("Disabling reader...")
|
|
ser.write("disable".encode('utf-8'))
|
|
reader_info["enabled"] = new_enabled
|
|
|
|
if new_mode != reader_info["mode"]:
|
|
print("Reader mode has changed: ", new_mode)
|
|
if new_mode == "LATCH" and reader_info["mode"] == "TOGGLE":
|
|
print("Transitioned from TOGGLE to LATCH, sending 'lock' over serial...")
|
|
ser.write("lock".encode("utf-8"))
|
|
reader_info["mode"] = new_mode
|
|
|
|
if new_mode == "LATCH" and reader_info["mode"] == "TOGGLE":
|
|
print("Transitioned from LATCH to TOGGLE, sending 'unlock' over serial...")
|
|
time.sleep(10) # Check every 10 seconds
|
|
except (RequestException, ConnectionError) as e:
|
|
print(f"HTTP request failed with exception: {e}")
|
|
time.sleep(10) # Retry after 10 seconds
|
|
|
|
def serial_router():
|
|
while True:
|
|
data = ser.readline()
|
|
try:
|
|
data = data.decode('utf-8')
|
|
try:
|
|
if '=' in data:
|
|
prefix, value = data.split('=')
|
|
prefix = prefix.strip()
|
|
value = value.strip()
|
|
|
|
actions = {
|
|
"LOG_MSG": handle_log_msg,
|
|
"FULL_CARD_ID": check_valid_card,
|
|
"DOOR_DISABLED_STATUS": handle_log_msg,
|
|
"DOOR_LOCK_STATUS": handle_log_msg
|
|
}
|
|
|
|
if prefix in actions:
|
|
actions[prefix](prefix, value)
|
|
else:
|
|
print(f"Unknown prefix: {prefix}")
|
|
except UnicodeDecodeError:
|
|
hex_data = binascii.hexlify(data).decode('utf-8')
|
|
print(f"Received non-UTF-8 data: {hex_data}")
|
|
except:
|
|
print("Error decoding serial data")
|
|
|
|
def start_thread(thread_function):
|
|
thread = threading.Thread(target=thread_function)
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
if __name__ == "__main__":
|
|
ser = serial.Serial(SERIAL_PORT, BAUD_RATE)
|
|
|
|
# Fetch the initial reader configuration
|
|
fetch_initial_reader_config()
|
|
|
|
# Create and start the threads
|
|
threads = [start_thread(serial_router), start_thread(reader_config_check)]
|
|
|
|
# Wait for threads to finish (which is never in this case since they run indefinitely)
|
|
for thread in threads:
|
|
thread.join()
|