You've already forked RadioPlayer
mirror of
https://github.com/radio95-rnt/RadioPlayer.git
synced 2026-02-27 06:03:52 +01:00
OH MY FUCKING GOD
This commit is contained in:
@@ -1,5 +1,4 @@
|
|||||||
import multiprocessing, os
|
import multiprocessing, os
|
||||||
from multiprocessing.synchronize import Event
|
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
import json
|
import json
|
||||||
import threading, uuid, time
|
import threading, uuid, time
|
||||||
@@ -7,12 +6,7 @@ import asyncio
|
|||||||
import websockets
|
import websockets
|
||||||
from websockets import ServerConnection, Request, Response, Headers
|
from websockets import ServerConnection, Request, Response, Headers
|
||||||
|
|
||||||
from . import Track, PlayerModule, Path, log95
|
from . import Track, PlayerModule, Path
|
||||||
|
|
||||||
from typing import TextIO
|
|
||||||
_log_out: TextIO
|
|
||||||
|
|
||||||
assert _log_out # pyright: ignore[reportUnboundVariable]
|
|
||||||
|
|
||||||
MAIN_PATH_DIR = Path("/home/user/mixes")
|
MAIN_PATH_DIR = Path("/home/user/mixes")
|
||||||
|
|
||||||
@@ -107,18 +101,13 @@ async def _safe_send(ws, payload: str, clients: set):
|
|||||||
try: clients.discard(ws)
|
try: clients.discard(ws)
|
||||||
except Exception: pass
|
except Exception: pass
|
||||||
|
|
||||||
def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws_q: multiprocessing.Queue, shutdown_evt: Event):
|
def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws_q: multiprocessing.Queue):
|
||||||
"""
|
"""
|
||||||
Entrypoint for the separate process that runs the asyncio-based websocket server.
|
Entrypoint for the separate process that runs the asyncio-based websocket server.
|
||||||
"""
|
"""
|
||||||
# create the asyncio loop and run server
|
# create the asyncio loop and run server
|
||||||
async def runner():
|
async def runner():
|
||||||
clients = set()
|
clients = set()
|
||||||
stop_evt = asyncio.Event()
|
|
||||||
|
|
||||||
async def shutdown_watcher():
|
|
||||||
await loop.run_in_executor(None, shutdown_evt.wait)
|
|
||||||
stop_evt.set()
|
|
||||||
|
|
||||||
async def handler_wrapper(websocket: ServerConnection):
|
async def handler_wrapper(websocket: ServerConnection):
|
||||||
# register client
|
# register client
|
||||||
@@ -144,20 +133,10 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws
|
|||||||
b"WebSocket upgrade required\n"
|
b"WebSocket upgrade required\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
watcher = asyncio.create_task(shutdown_watcher())
|
|
||||||
server = await websockets.serve(handler_wrapper, "0.0.0.0", 3001, server_header="RadioPlayer ws plugin", process_request=process_request)
|
server = await websockets.serve(handler_wrapper, "0.0.0.0", 3001, server_header="RadioPlayer ws plugin", process_request=process_request)
|
||||||
broadcaster = asyncio.create_task(broadcast_worker(ws_q, clients))
|
broadcaster = asyncio.create_task(broadcast_worker(ws_q, clients))
|
||||||
|
await broadcaster
|
||||||
await stop_evt.wait()
|
|
||||||
close_tasks = [ws.close(1001, "Server shutting down") for ws in list(clients)]
|
|
||||||
await asyncio.gather(*close_tasks, return_exceptions=True)
|
|
||||||
clients.clear()
|
|
||||||
server.close()
|
|
||||||
await server.wait_closed()
|
await server.wait_closed()
|
||||||
watcher.cancel()
|
|
||||||
broadcaster.cancel()
|
|
||||||
try: await asyncio.wait_for(asyncio.gather(watcher, broadcaster, return_exceptions=True), timeout=2.0)
|
|
||||||
except asyncio.TimeoutError: pass
|
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
@@ -167,8 +146,6 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws
|
|||||||
|
|
||||||
class Module(PlayerModule):
|
class Module(PlayerModule):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.logger = log95.log95("WEB", output=_log_out)
|
|
||||||
|
|
||||||
self.manager = multiprocessing.Manager()
|
self.manager = multiprocessing.Manager()
|
||||||
self.data = self.manager.dict()
|
self.data = self.manager.dict()
|
||||||
self.imc_q = multiprocessing.Queue()
|
self.imc_q = multiprocessing.Queue()
|
||||||
@@ -182,8 +159,7 @@ class Module(PlayerModule):
|
|||||||
self.ipc_thread = threading.Thread(target=self._ipc_worker, daemon=True)
|
self.ipc_thread = threading.Thread(target=self._ipc_worker, daemon=True)
|
||||||
self.ipc_thread.start()
|
self.ipc_thread.start()
|
||||||
|
|
||||||
self.shutdown_evt = multiprocessing.Event()
|
self.ws_process = multiprocessing.Process(target=websocket_server_process, args=(self.data, self.imc_q, self.ws_q), daemon=False)
|
||||||
self.ws_process = multiprocessing.Process(target=websocket_server_process, args=(self.data, self.imc_q, self.ws_q, self.shutdown_evt), daemon=False)
|
|
||||||
self.ws_process.start()
|
self.ws_process.start()
|
||||||
if os.name == "posix":
|
if os.name == "posix":
|
||||||
try: os.setpgid(self.ws_process.pid, self.ws_process.pid)
|
try: os.setpgid(self.ws_process.pid, self.ws_process.pid)
|
||||||
@@ -237,14 +213,11 @@ class Module(PlayerModule):
|
|||||||
except Exception: pass
|
except Exception: pass
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
self.logger.info("Shutting down...")
|
|
||||||
self.ipc_thread_running = False
|
self.ipc_thread_running = False
|
||||||
|
|
||||||
try: self.imc_q.put(None, block=False)
|
try: self.imc_q.put(None, block=False)
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
self.shutdown_evt.set()
|
|
||||||
|
|
||||||
try: self.ws_q.put(None, block=False)
|
try: self.ws_q.put(None, block=False)
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
@@ -258,6 +231,5 @@ class Module(PlayerModule):
|
|||||||
if self.ws_process.is_alive():
|
if self.ws_process.is_alive():
|
||||||
self.ws_process.kill()
|
self.ws_process.kill()
|
||||||
self.ws_process.join(timeout=1)
|
self.ws_process.join(timeout=1)
|
||||||
self.logger.info(f"Stopped the ws process ({self.ws_process.is_alive()=} {self.ipc_thread.is_alive()=} {self.ipc_thread_running=})")
|
|
||||||
|
|
||||||
module = Module()
|
module = Module()
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ class ModuleManager:
|
|||||||
for module in self.simple_modules:
|
for module in self.simple_modules:
|
||||||
if module:
|
if module:
|
||||||
try: module.shutdown()
|
try: module.shutdown()
|
||||||
except Exception: traceback.print_exc(file=self.logger.output)
|
except BaseException: traceback.print_exc(file=self.logger.output)
|
||||||
def load_modules(self):
|
def load_modules(self):
|
||||||
"""Loads the modules into memory"""
|
"""Loads the modules into memory"""
|
||||||
for file in MODULES_DIR.glob("*"):
|
for file in MODULES_DIR.glob("*"):
|
||||||
@@ -198,6 +198,7 @@ class RadioPlayer:
|
|||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
if self.procman: self.procman.stop_all()
|
if self.procman: self.procman.stop_all()
|
||||||
|
self.modman.shutdown_modules()
|
||||||
self.logger.output.close()
|
self.logger.output.close()
|
||||||
|
|
||||||
def handle_sigint(self, signum: int, frame: types.FrameType | None):
|
def handle_sigint(self, signum: int, frame: types.FrameType | None):
|
||||||
|
|||||||
Reference in New Issue
Block a user