You've already forked RadioPlayer
mirror of
https://github.com/radio95-rnt/RadioPlayer.git
synced 2026-02-26 21:53:54 +01:00
shutdown
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import multiprocessing, os
|
import multiprocessing, os
|
||||||
from multiprocessing.synchronize import Event
|
from multiprocessing.synchronize import Event
|
||||||
|
from queue import Empty
|
||||||
import json
|
import json
|
||||||
import threading, uuid, time
|
import threading, uuid, time
|
||||||
import asyncio
|
import asyncio
|
||||||
@@ -84,14 +85,15 @@ async def broadcast_worker(ws_q: multiprocessing.Queue, clients: set):
|
|||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
while True:
|
while True:
|
||||||
msg = await loop.run_in_executor(None, ws_q.get)
|
try: msg = asyncio.wait_for(loop.run_in_executor(None, ws_q.get), 1.0)
|
||||||
|
except asyncio.TimeoutError: continue
|
||||||
if msg is None: break
|
if msg is None: break
|
||||||
payload = json.dumps(msg)
|
payload = json.dumps(msg)
|
||||||
if clients:
|
if clients:
|
||||||
coros = []
|
await asyncio.gather(
|
||||||
for ws in list(clients):
|
*[_safe_send(ws, payload, clients) for ws in list(clients)],
|
||||||
coros.append(_safe_send(ws, payload, clients))
|
return_exceptions=True
|
||||||
await asyncio.gather(*coros)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _safe_send(ws, payload: str, clients: set):
|
async def _safe_send(ws, payload: str, clients: set):
|
||||||
@@ -117,7 +119,7 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws
|
|||||||
# register client
|
# register client
|
||||||
clients.add(websocket)
|
clients.add(websocket)
|
||||||
try: await ws_handler(websocket, shared_data, imc_q, ws_q)
|
try: await ws_handler(websocket, shared_data, imc_q, ws_q)
|
||||||
finally:
|
finally:
|
||||||
await websocket.close(1001, "")
|
await websocket.close(1001, "")
|
||||||
clients.discard(websocket)
|
clients.discard(websocket)
|
||||||
async def process_request(websocket: ServerConnection, request: Request):
|
async def process_request(websocket: ServerConnection, request: Request):
|
||||||
@@ -129,7 +131,7 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws
|
|||||||
Headers([("Content-Type", "text/html"), ("Content-Length", f"{len(data)}")]),
|
Headers([("Content-Type", "text/html"), ("Content-Length", f"{len(data)}")]),
|
||||||
data
|
data
|
||||||
)
|
)
|
||||||
if not "upgrade" in request.headers.get("Connection", "").lower():
|
if not "upgrade" in request.headers.get("Connection", "").lower():
|
||||||
return Response(
|
return Response(
|
||||||
426,
|
426,
|
||||||
"Upgrade Required",
|
"Upgrade Required",
|
||||||
@@ -142,13 +144,21 @@ def websocket_server_process(shared_data: dict, imc_q: multiprocessing.Queue, ws
|
|||||||
broadcaster = asyncio.create_task(broadcast_worker(ws_q, clients))
|
broadcaster = asyncio.create_task(broadcast_worker(ws_q, clients))
|
||||||
|
|
||||||
await stop_evt.wait()
|
await stop_evt.wait()
|
||||||
|
close_tasks = [ws.close(1001, "Server shutting down") for ws in list(clients)]
|
||||||
|
await asyncio.gather(*close_tasks, return_exceptions=True)
|
||||||
|
clients.clear()
|
||||||
server.close()
|
server.close()
|
||||||
ws_q.put(None)
|
ws_q.put(None)
|
||||||
await server.wait_closed()
|
await server.wait_closed()
|
||||||
watcher.cancel()
|
watcher.cancel()
|
||||||
broadcaster.cancel()
|
broadcaster.cancel()
|
||||||
await asyncio.gather(watcher, broadcaster, return_exceptions=True)
|
try:
|
||||||
return
|
await asyncio.wait_for(
|
||||||
|
asyncio.gather(watcher, broadcaster, return_exceptions=True),
|
||||||
|
timeout=2.0
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
@@ -185,10 +195,11 @@ class Module(PlayerModule):
|
|||||||
"""
|
"""
|
||||||
while self.ipc_thread_running:
|
while self.ipc_thread_running:
|
||||||
try:
|
try:
|
||||||
message: dict | None = self.imc_q.get()
|
message: dict | None = self.imc_q.get(timeout=0.5)
|
||||||
if message is None: break
|
if message is None: break
|
||||||
out = self._imc.send(self, message["name"], message["data"])
|
out = self._imc.send(self, message["name"], message["data"])
|
||||||
if key := message.get("key", None): self.data[key] = out
|
if key := message.get("key", None): self.data[key] = out
|
||||||
|
except Empty: continue
|
||||||
except Exception: pass
|
except Exception: pass
|
||||||
|
|
||||||
def on_new_playlist(self, playlist: list[Track], global_args: dict[str, str]) -> None:
|
def on_new_playlist(self, playlist: list[Track], global_args: dict[str, str]) -> None:
|
||||||
@@ -226,19 +237,20 @@ class Module(PlayerModule):
|
|||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
self.ipc_thread_running = False
|
self.ipc_thread_running = False
|
||||||
self.imc_q.put(None)
|
|
||||||
|
try:self.imc_q.put(None)
|
||||||
|
except: pass
|
||||||
|
|
||||||
self.ipc_thread.join(timeout=2)
|
self.ipc_thread.join(timeout=2)
|
||||||
|
|
||||||
self.shutdown_evt.set()
|
self.shutdown_evt.set()
|
||||||
|
|
||||||
self.ws_process.join(timeout=5)
|
self.ws_process.join(timeout=5)
|
||||||
|
|
||||||
if self.ws_process.is_alive():
|
if self.ws_process.is_alive():
|
||||||
self.ws_process.terminate()
|
self.ws_process.terminate()
|
||||||
self.ws_process.join(timeout=2)
|
self.ws_process.join(timeout=2)
|
||||||
|
|
||||||
while self.ws_process.is_alive():
|
if self.ws_process.is_alive():
|
||||||
self.ws_process.kill()
|
self.ws_process.kill()
|
||||||
self.ws_process.close()
|
self.ws_process.join(timeout=1)
|
||||||
|
|
||||||
module = Module()
|
module = Module()
|
||||||
|
|||||||
Reference in New Issue
Block a user