diff --git a/modules/web.py b/modules/web.py index 0fcf30a..9b5b173 100644 --- a/modules/web.py +++ b/modules/web.py @@ -1,5 +1,6 @@ import multiprocessing, os from multiprocessing.synchronize import Event +from queue import Empty import json import threading, uuid, time import asyncio @@ -84,14 +85,15 @@ async def broadcast_worker(ws_q: multiprocessing.Queue, clients: set): """ loop = asyncio.get_event_loop() while True: - msg = await loop.run_in_executor(None, ws_q.get) + try: msg = asyncio.wait_for(loop.run_in_executor(None, ws_q.get), 1.0) + except asyncio.TimeoutError: continue if msg is None: break payload = json.dumps(msg) if clients: - coros = [] - for ws in list(clients): - coros.append(_safe_send(ws, payload, clients)) - await asyncio.gather(*coros) + await asyncio.gather( + *[_safe_send(ws, payload, clients) for ws in list(clients)], + return_exceptions=True + ) async def _safe_send(ws, payload: str, clients: set): @@ -117,7 +119,7 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws # register client clients.add(websocket) try: await ws_handler(websocket, shared_data, imc_q, ws_q) - finally: + finally: await websocket.close(1001, "") clients.discard(websocket) async def process_request(websocket: ServerConnection, request: Request): @@ -129,7 +131,7 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws Headers([("Content-Type", "text/html"), ("Content-Length", f"{len(data)}")]), data ) - if not "upgrade" in request.headers.get("Connection", "").lower(): + if not "upgrade" in request.headers.get("Connection", "").lower(): return Response( 426, "Upgrade Required", @@ -142,13 +144,21 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws broadcaster = asyncio.create_task(broadcast_worker(ws_q, clients)) await stop_evt.wait() + close_tasks = [ws.close(1001, "Server shutting down") for ws in list(clients)] + await asyncio.gather(*close_tasks, return_exceptions=True) + clients.clear() server.close() ws_q.put(None) await server.wait_closed() watcher.cancel() broadcaster.cancel() - await asyncio.gather(watcher, broadcaster, return_exceptions=True) - return + try: + await asyncio.wait_for( + asyncio.gather(watcher, broadcaster, return_exceptions=True), + timeout=2.0 + ) + except asyncio.TimeoutError: + pass loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -185,10 +195,11 @@ class Module(PlayerModule): """ while self.ipc_thread_running: try: - message: dict | None = self.imc_q.get() + message: dict | None = self.imc_q.get(timeout=0.5) if message is None: break out = self._imc.send(self, message["name"], message["data"]) if key := message.get("key", None): self.data[key] = out + except Empty: continue except Exception: pass def on_new_playlist(self, playlist: list[Track], global_args: dict[str, str]) -> None: @@ -226,19 +237,20 @@ class Module(PlayerModule): def shutdown(self): self.ipc_thread_running = False - self.imc_q.put(None) + + try:self.imc_q.put(None) + except: pass + self.ipc_thread.join(timeout=2) - self.shutdown_evt.set() - self.ws_process.join(timeout=5) if self.ws_process.is_alive(): self.ws_process.terminate() self.ws_process.join(timeout=2) - while self.ws_process.is_alive(): + if self.ws_process.is_alive(): self.ws_process.kill() - self.ws_process.close() + self.ws_process.join(timeout=1) module = Module()