import serial import requests import binascii import threading import time from requests.exceptions import RequestException, ConnectionError # Constants SERIAL_PORT = '/dev/tty.usbserial-0001' BAUD_RATE = 9600 TINANCE2_READER_IDENTIFIER = "TEST-READER-1" TINANCE2_READER_ACCESS_KEY = "4f95166a-8a15-4963-bad8-35c7047b4269" API_BASE_URL = 'http://localhost:8000/accesscontrol/api/' # Reader configuration reader_info = { "enabled": False, "mode": None } # Headers for API requests HEADERS = { "Content-Type": "application/json", "X-Identifier": TINANCE2_READER_IDENTIFIER, "X-Access-Key": TINANCE2_READER_ACCESS_KEY } # Define a maximum number of retry attempts and a delay between retries MAX_RETRY_ATTEMPTS = 3 RETRY_DELAY_SECONDS = 10 def handle_log_msg(prefix, value): print(f"Received LOG_MSG: {value}") def send_http_request_with_retry(url, data, headers): retry_count = 0 while retry_count < MAX_RETRY_ATTEMPTS: try: response = requests.post(url, json=data, headers=headers) if response.status_code == 200: print(f"Sent FULL_CARD_ID: {data['full_card_id']} via HTTP POST successfully") if reader_info.get("mode") == "LATCH": ser.write("unlock".encode('utf-8')) elif reader_info.get("mode") == "TOGGLE": ser.write("toggle".encode('utf-8')) break # Successful request, no need to retry else: print(f"Failed to send FULL_CARD_ID via HTTP POST. Status code: {response.status_code}") print(response.text) except (RequestException, ConnectionError) as e: print(f"HTTP request failed with exception: {e}") retry_count += 1 if retry_count < MAX_RETRY_ATTEMPTS: print(f"Retrying in {RETRY_DELAY_SECONDS} seconds...") time.sleep(RETRY_DELAY_SECONDS) else: print("Max retry attempts reached. Request failed.") break def check_valid_card(prefix, value): if value == "0:0": print("Received invalid card ID (0:0), ignoring...") return url = f'{API_BASE_URL}check-card-id' data_to_post = {"full_card_id": value} send_http_request_with_retry(url, data_to_post, HEADERS) def fetch_initial_reader_config(): try: url = f'{API_BASE_URL}readerinfo' response = requests.get(url, headers=HEADERS) if response.status_code == 200: data = response.json() reader_info["enabled"] = data.get("enabled", False) reader_info["mode"] = data.get("mode") except (RequestException, ConnectionError) as e: print(f"Error fetching initial reader config: {e}") def reader_config_check(): while True: print("Checking HTTP endpoint for new reader config...") url = f'{API_BASE_URL}readerinfo' response = requests.get(url, headers=HEADERS) if response.status_code == 200: data = response.json() new_enabled = data.get("enabled", False) new_mode = data.get("mode") if new_enabled != reader_info["enabled"]: print("New reader enabled status: ", new_enabled) if new_enabled: print("Enabling reader...") ser.write("enable".encode('utf-8')) else: print("Disabling reader...") ser.write("disable".encode('utf-8')) reader_info["enabled"] = new_enabled if new_mode != reader_info["mode"]: print("Reader mode has changed: ", new_mode) if new_mode == "LATCH" and reader_info["mode"] == "TOGGLE": print("Transitioned from TOGGLE to LATCH, sending 'lock' over serial...") ser.write("lock".encode("utf-8")) reader_info["mode"] = new_mode if new_mode == "LATCH" and reader_info["mode"] == "TOGGLE": print("Transitioned from LATCH to TOGGLE, sending 'unlock' over serial...") time.sleep(15) # Check every 15 seconds def serial_router(): while True: data = ser.readline() try: data = data.decode('utf-8') if '=' in data: prefix, value = data.split('=') prefix = prefix.strip() value = value.strip() actions = { "LOG_MSG": handle_log_msg, "FULL_CARD_ID": check_valid_card, "DOOR_DISABLED_STATUS": handle_log_msg, "DOOR_LOCK_STATUS": handle_log_msg } if prefix in actions: actions[prefix](prefix, value) else: print(f"Unknown prefix: {prefix}") except UnicodeDecodeError: hex_data = binascii.hexlify(data).decode('utf-8') print(f"Received non-UTF-8 data: {hex_data}") if __name__ == "__main__": ser = serial.Serial(SERIAL_PORT, BAUD_RATE) # Fetch the initial reader configuration fetch_initial_reader_config() # Create and start the two threads serial_router_thread = threading.Thread(target=serial_router) reader_config_check_thread = threading.Thread(target=reader_config_check) serial_router_thread.start() reader_config_check_thread.start() # Wait for both threads to finish (which is never in this case since they run indefinitely) serial_router_thread.join() reader_config_check_thread.join()