import asyncio import pjsua2 as pj from markupsafe import escape from unificontrol import UnifiClient from dotenv import load_dotenv from os import getenv from ssl import get_server_certificate from flask import Flask, request, render_template, Response, send_from_directory import os app = Flask(__name__) app.config['TEMPLATES_AUTO_RELOAD'] = True global unifi VISITOR_COUNT_FILE = "visitor_count" SIP = None def phone(hostname, username, password): class MyAccount(pj.Account): def __init__(self): pj.Account.__init__(self) def onRegState(self, prm): print("Registration state:", prm.code) class MyCall(pj.Call): def __init__(self, acc, dest_uri): pj.Call.__init__(self, acc) self.makeCall(dest_uri, pj.CallOpParam(True)) def onCallState(self, prm): ci = self.getInfo() print("Call state:", ci.stateText) def onCallMediaState(self, prm): ci = self.getInfo() for mi in ci.media: if mi.type == pj.MediaType.AUDIO and mi.status == pj.MediaStatus.ACTIVE: aud_med = self.getMedia(mi.index) aud_med.startTransmit(lib.endpoint.audDevManager().getPlaybackDevMedia()) lib.endpoint.audDevManager().getCaptureDevMedia().startTransmit(aud_med) # Initialize lib lib = pj.Endpoint() lib.libCreate() ep_cfg = pj.EpConfig() lib.libInit(ep_cfg) transport_cfg = pj.TransportConfig() transport_cfg.port = 5060 lib.transportCreate(pj.PJSIP_TRANSPORT_UDP, transport_cfg) lib.libStart() # Account config acc_cfg = pj.AccountConfig() acc_cfg.idUri = f"sip:{username}@{hostname}" acc_cfg.regConfig.registrarUri = f"sip:{hostname}" cred = pj.AuthCredInfo("digest", "*", username, 0, password) acc_cfg.sipConfig.authCreds.append(cred) acc = MyAccount() acc.create(acc_cfg) # Make call call = MyCall(acc, f"sip:3002@{hostname}") # Keep the program running import time time.sleep(30) # Clean up lib.libDestroy() def main(): if not os.path.exists(VISITOR_COUNT_FILE): os.mknod(VISITOR_COUNT_FILE) with open(VISITOR_COUNT_FILE, "w") as f: f.write("0") load_dotenv() HOSTNAME = str(getenv("HOSTNAME")) USERNAME = str(getenv("USERNAME")) PASSWORD = str(getenv("PASSWORD")) SIP_SERVER = str(getenv("SIP_SERVER")) SIP_USERNAME = str(getenv("SIP_USERNAME")) SIP_PASSWORD = str(getenv("SIP_PASSWORD")) phone(SIP_SERVER, SIP_USERNAME, SIP_PASSWORD) cert = get_server_certificate((HOSTNAME, 443)) global unifi unifi = UnifiClient(host=HOSTNAME, username=USERNAME, password=PASSWORD, cert=cert) app.run(host="0.0.0.0", port=80) def visitor_count(): visitor_count = 1337 # Yes there is a race condition here, I don't care try: with open(VISITOR_COUNT_FILE, "r+") as f: visitors = int(f.read().strip()) visitors += 1 f.seek(0) f.write(str(visitors)) f.truncate() visitor_count = visitors except Exception as e: print(f"error {e}") return visitor_count @app.route('/guest/s/default/', methods=["GET"]) def root(): id = escape(request.args.get('id')) page = render_template("index.html", dialing_up=False, id=id, visitor_count=visitor_count()) response = Response(page, mimetype="text/html") return response @app.route("/dialup", methods=["POST"]) def dialup(): # Spawn audio on phone # authorize guest after a certain amount of time id = escape(request.args.get('id')) page = render_template("index.html", dialing_up=True, id=id, visitor_count=visitor_count()) #unifi.authorize_guest(id, 1) response = Response(page, mimetype="text/html") return response @app.route('/afbeeldingen/') def static_folder(path): return send_from_directory('static', path) if __name__ == "__main__": main()