0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 21:53:54 +01:00

this is a beast

This commit is contained in:
KubaPro010
2025-11-12 22:05:37 +01:00
parent 76325d0dfa
commit 5fd56bfa41
5 changed files with 207 additions and 209 deletions

View File

@@ -2,10 +2,10 @@ from . import ActiveModifier, log95, Track, Path
import os, glob, datetime import os, glob, datetime
from typing import TextIO from typing import TextIO
_log_file: TextIO _log_out: TextIO
assert _log_file # pyright: ignore[reportUnboundVariable] assert _log_out # pyright: ignore[reportUnboundVariable]
logger = log95.log95("AC-MOD", output=_log_file) logger = log95.log95("AC-MOD", output=_log_out)
class Module(ActiveModifier): class Module(ActiveModifier):
def __init__(self) -> None: def __init__(self) -> None:

View File

@@ -10,10 +10,10 @@ from . import PlaylistAdvisor, log95, Path
import os, datetime import os, datetime
from typing import TextIO from typing import TextIO
_log_file: TextIO _log_out: TextIO
assert _log_file # pyright: ignore[reportUnboundVariable] assert _log_out # pyright: ignore[reportUnboundVariable]
logger = log95.log95("ADVISOR", output=_log_file) logger = log95.log95("ADVISOR", output=_log_out)
playlist_dir = Path("/home/user/playlists") playlist_dir = Path("/home/user/playlists")

View File

@@ -11,11 +11,11 @@ rds_default_artist = "radio95"
udp_host = ("127.0.0.1", 5000) udp_host = ("127.0.0.1", 5000)
from typing import TextIO from typing import TextIO
_log_file: TextIO _log_out: TextIO
logger_level = log95.log95Levels.DEBUG if DEBUG else log95.log95Levels.CRITICAL_ERROR logger_level = log95.log95Levels.DEBUG if DEBUG else log95.log95Levels.CRITICAL_ERROR
assert _log_file # pyright: ignore[reportUnboundVariable] assert _log_out # pyright: ignore[reportUnboundVariable]
logger = log95.log95("RDS-MODULE", logger_level, output=_log_file) logger = log95.log95("RDS-MODULE", logger_level, output=_log_out)
def load_dict_from_custom_format(file_path: str) -> dict[str, str]: def load_dict_from_custom_format(file_path: str) -> dict[str, str]:
try: try:

View File

@@ -2,10 +2,10 @@ from . import PlayerModule, log95, Track
import os import os
from typing import TextIO from typing import TextIO
_log_file: TextIO _log_out: TextIO
assert _log_file # pyright: ignore[reportUnboundVariable] assert _log_out # pyright: ignore[reportUnboundVariable]
logger = log95.log95("PlayView", output=_log_file) logger = log95.log95("PlayView", output=_log_out)
class Module(PlayerModule): class Module(PlayerModule):
def __init__(self) -> None: def __init__(self) -> None:

View File

@@ -1,8 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import time import time
import os, subprocess, importlib.util, types import os, subprocess, importlib.util, types
import sys, signal, threading, glob import sys, signal, threading, glob
import libcache, traceback import libcache, traceback, atexit
from modules import * from modules import *
def prefetch(path): def prefetch(path):
@@ -13,25 +14,9 @@ def prefetch(path):
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_NOREUSE) os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_NOREUSE)
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_WILLNEED) os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_WILLNEED)
simple_modules: list[PlayerModule] = []
playlist_modifier_modules: list[PlaylistModifierModule] = []
playlist_advisor: PlaylistAdvisor | None = None
active_modifier: ActiveModifier | None = None
MODULES_PACKAGE = "modules" MODULES_PACKAGE = "modules"
MODULES_DIR = Path(__file__, "..", MODULES_PACKAGE).resolve() MODULES_DIR = Path(__file__, "..", MODULES_PACKAGE).resolve()
log_file_path = Path("/tmp/radioPlayer_log")
if log_file_path.exists(): log_file_path.unlink()
log_file_path.touch()
log_file = open(log_file_path, "w")
logger = log95.log95("CORE", output=log_file)
exit_pending = False
exit_status_code = 0
intr_time = 0
exit_lock = threading.Lock()
class ProcessManager(Skeleton_ProcessManager): class ProcessManager(Skeleton_ProcessManager):
def __init__(self) -> None: def __init__(self) -> None:
self.lock = threading.Lock() self.lock = threading.Lock()
@@ -83,219 +68,232 @@ class ProcessManager(Skeleton_ProcessManager):
except subprocess.TimeoutExpired: process.process.terminate() except subprocess.TimeoutExpired: process.process.terminate()
self.processes.clear() self.processes.clear()
procman = ProcessManager() class PlaylistParser:
def __init__(self, output: log95.TextIO) -> types.NoneType: self.logger = log95.log95("PARSER", output=output)
def handle_sigint(signum, frame): def load_filelines(self, path: Path):
global exit_pending, intr_time, exit_status_code try:
with exit_lock: return [line.strip() for line in path.read_text().splitlines() if line.strip()]
logger.info("Received SIGINT") except FileNotFoundError:
if (time.monotonic() - intr_time) > 5: self.logger.error(f"Playlist not found: {path.name}")
intr_time = time.monotonic() return []
logger.info("Will quit on song end.")
exit_pending = True def _check_for_imports(self, path: Path, seen=None) -> list[str]:
exit_status_code = 130
else:
logger.warning("Force-Quit pending")
procman.stop_all()
raise SystemExit(130)
signal.signal(signal.SIGINT, handle_sigint)
def load_filelines(path: Path):
try:
return [line.strip() for line in path.read_text().splitlines() if line.strip()]
except FileNotFoundError:
logger.error(f"Playlist not found: {path.name}")
return []
def parse_playlistfile(playlist_path: Path) -> tuple[dict[str, str], list[tuple[list[str], dict[str, str]]]]:
lines = load_filelines(playlist_path)
def check_for_imports(lines: list[str], seen=None) -> list[str]:
if seen is None: seen = set() if seen is None: seen = set()
lines = self.load_filelines(path)
out = [] out = []
for line in lines: for line in lines:
line = line.strip()
if line.startswith("@"): if line.startswith("@"):
target = Path(line.removeprefix("@")) target = Path(line.removeprefix("@"))
if target not in seen: if target not in seen:
if not target.exists(): if not target.exists():
logger.error(f"Target {target.name} of {playlist_path.name} does not exist") self.logger.error(f"Target {target.name} of {path.name} does not exist")
continue continue
seen.add(target) seen.add(target)
out.extend(check_for_imports(load_filelines(target), seen)) out.extend(self._check_for_imports(target, seen))
else: out.append(line) else: out.append(line)
return out return out
lines = check_for_imports(lines) # First, import everything
out = [] def parse_playlistfile(self, playlist_path: Path) -> tuple[dict[str, str], list[tuple[list[str], dict[str, str]]]]:
global_arguments = {} lines = self._check_for_imports(playlist_path) # First, import everything
for line in lines: out = []
arguments = {} global_arguments = {}
line = line.strip() for line in lines:
if not line or line.startswith(";") or line.startswith("#"): continue arguments = {}
if "|" in line: line = line.strip()
if line.startswith("|"): # No file name, we're defining global arguments if not line or line.startswith(";") or line.startswith("#"): continue
args = line.removeprefix("|").split(";") if "|" in line:
for arg in args: if line.startswith("|"): # No file name, we're defining global arguments
key, val = arg.split("=", 1) args = line.removeprefix("|").split(";")
global_arguments[key] = val for arg in args:
key, val = arg.split("=", 1)
global_arguments[key] = val
else:
line, args = line.split("|", 1)
args = args.split(";")
for arg in args:
key, val = arg.split("=", 1)
arguments[key] = val
out.append(([f for f in glob.glob(line) if os.path.isfile(f)], arguments))
return global_arguments, out
class RadioPlayer:
def __init__(self, output: log95.TextIO):
self.simple_modules: list[PlayerModule] = []
self.playlist_modifier_modules: list[PlaylistModifierModule] = []
self.playlist_advisor: PlaylistAdvisor | None = None
self.active_modifier: ActiveModifier | None = None
self.logger = log95.log95("CORE", output=output)
self.exit_pending = False
self.exit_status_code = 0
self.intr_time = 0
self.exit_lock = threading.Lock()
self.procman = ProcessManager()
self.modules: list[tuple] = []
self.parser = PlaylistParser(output)
def shutdown(self): self.procman.stop_all()
def handle_sigint(self, signum, frame):
with self.exit_lock:
self.logger.info("Received CTRL+C (SIGINT)")
if (time.monotonic() - self.intr_time) > 5:
self.intr_time = time.monotonic()
self.logger.info("Will quit on song end.")
self.exit_pending = True
self.exit_status_code = 130
else: else:
line, args = line.split("|", 1) self.logger.warning("Force-Quit pending")
args = args.split(";") raise SystemExit(130)
for arg in args:
key, val = arg.split("=", 1)
arguments[key] = val
out.append(([f for f in glob.glob(line) if os.path.isfile(f)], arguments))
return global_arguments, out
def play_playlist(playlist_path: Path, starting_index: int = 0): def load_modules(self):
assert playlist_advisor for file in MODULES_DIR.glob("*"):
if file.name.endswith(".py") and file.name != "__init__.py":
module_name = file.name[:-3]
full_module_name = f"{MODULES_PACKAGE}.{module_name}"
try: global_args, parsed = parse_playlistfile(playlist_path) spec = importlib.util.spec_from_file_location(full_module_name, Path(MODULES_DIR, file))
except Exception as e: assert spec
logger.info(f"Exception ({e}) while parsing playlist, retrying in 15 seconds...") module = importlib.util.module_from_spec(spec)
time.sleep(15)
return
playlist: list[Track] = [] sys.modules[full_module_name] = module
[playlist.extend(Track(Path(line).absolute(), True, True, True, args) for line in lns) for (lns, args) in parsed] # i can read this, i think if MODULES_PACKAGE not in sys.modules:
parent = types.ModuleType(MODULES_PACKAGE)
parent.__path__ = [str(MODULES_DIR)]
parent.__package__ = MODULES_PACKAGE
sys.modules[MODULES_PACKAGE] = parent
module.__package__ = MODULES_PACKAGE
for module in playlist_modifier_modules: playlist = module.modify(global_args, playlist) or playlist # id one liner this but the assignement is stopping me module._log_out = self.logger.output # type: ignore
module.__dict__['_log_out'] = self.logger.output
self.modules.append((spec, module, module_name))
def start_modules(self):
for (spec, module, module_name) in self.modules:
assert spec.loader
try:
start = time.monotonic()
time.perf_counter()
spec.loader.exec_module(module)
time_took = time.monotonic() - start
if time_took > 0.2: self.logger.warning(f"{module_name} took {time_took:.2f}s to start")
except Exception as e:
traceback.print_exc(file=self.logger.output)
self.logger.error(f"Failed loading {module_name} due to {e}, continuing")
continue
prefetch(playlist[0].path) if md := getattr(module, "module", None):
if isinstance(md, list): self.simple_modules.extend(md)
else: self.simple_modules.append(md)
if md := getattr(module, "playlistmod", None):
if isinstance(md, tuple):
md, index = md
if isinstance(md, list): self.playlist_modifier_modules[index:index] = md
else: self.playlist_modifier_modules.insert(index, md)
elif isinstance(md, list): self.playlist_modifier_modules.extend(md)
else: self.playlist_modifier_modules.append(md)
if md := getattr(module, "advisor", None):
if self.playlist_advisor: raise Exception("Multiple playlist advisors")
self.playlist_advisor = md
if md := getattr(module, "activemod", None):
if self.active_modifier: raise Exception("Multiple active modifiers")
self.active_modifier = md
InterModuleCommunication(self.simple_modules + [self.playlist_advisor, ProcmanCommunicator(self.procman), self.active_modifier])
[mod.on_new_playlist(playlist) for mod in simple_modules + [active_modifier] if mod] # one liner'd everything def start(self):
self.logger.info("Core starting, loading modules")
self.load_modules();self.start_modules()
if not self.playlist_advisor:
self.logger.critical_error("Playlist advisor was not found")
raise SystemExit(1)
return_pending = False def play_playlist(self, playlist_path: Path, starting_index: int = 0):
assert self.playlist_advisor
cross_fade = int(global_args.get("crossfade", 5)) try: global_args, parsed = self.parser.parse_playlistfile(playlist_path)
except Exception as e:
max_iterator = len(playlist) self.logger.info(f"Exception ({e}) while parsing playlist, retrying in 15 seconds...")
song_i = i = starting_index time.sleep(15)
while i < max_iterator:
if exit_pending:
logger.info("Quit received, waiting for song end.")
procman.wait_all()
raise SystemExit(exit_status_code)
elif return_pending:
logger.info("Return reached, next song will reload the playlist.")
procman.wait_all()
return return
if playlist_advisor.new_playlist(): playlist: list[Track] = []
logger.info("Reloading now...") [playlist.extend(Track(Path(line).absolute(), True, True, True, args) for line in lns) for (lns, args) in parsed] # i can read this, i think
return_pending = True
continue
track = playlist[song_i % len(playlist)] for module in self.playlist_modifier_modules: playlist = module.modify(global_args, playlist) or playlist # id one liner this but the assignement is stopping me
next_track = playlist[song_i + 1] if song_i + 1 < len(playlist) else None prefetch(playlist[0].path)
if active_modifier: [mod.on_new_playlist(playlist) for mod in self.simple_modules + [self.active_modifier] if mod] # one liner'd everything
(track, next_track), extend = active_modifier.play(song_i, track, next_track)
if track is None: return_pending = False
song_i += 1
cross_fade = int(global_args.get("crossfade", 5))
max_iterator = len(playlist)
song_i = i = starting_index
while i < max_iterator:
if self.exit_pending:
self.logger.info("Quit received, waiting for song end.")
self.procman.wait_all()
raise SystemExit(self.exit_status_code)
elif return_pending:
self.logger.info("Return reached, next song will reload the playlist.")
self.procman.wait_all()
return
if self.playlist_advisor.new_playlist():
self.logger.info("Reloading now...")
return_pending = True
continue continue
if extend: max_iterator += 1
else: extend = False
prefetch(track.path) track = playlist[song_i % len(playlist)]
next_track = playlist[song_i + 1] if song_i + 1 < len(playlist) else None
if self.active_modifier:
(track, next_track), extend = self.active_modifier.play(song_i, track, next_track)
if track is None:
song_i += 1
continue
if extend: max_iterator += 1
else: extend = False
logger.info(f"Now playing: {track.path.name}") prefetch(track.path)
self.logger.info(f"Now playing: {track.path.name}")
for module in simple_modules: module.on_new_track(song_i, track, next_track) for module in self.simple_modules: module.on_new_track(song_i, track, next_track)
pr = procman.play(track, cross_fade) pr = self.procman.play(track, cross_fade)
ttw = pr.duration
if track.fade_out: ttw -= cross_fade
end_time = pr.started_at + ttw
ttw = pr.duration if next_track: prefetch(next_track.path)
if track.fade_out: ttw -= cross_fade
end_time = pr.started_at + ttw while end_time >= time.monotonic() and pr.process.poll() is None:
start = time.monotonic()
[module.progress(song_i, track, time.monotonic() - pr.started_at, pr.duration, ttw) for module in self.simple_modules if module]
elapsed = time.monotonic() - start
remaining_until_end = end_time - time.monotonic()
if elapsed < 1 and remaining_until_end > 0: time.sleep(min(1 - elapsed, remaining_until_end))
if next_track: prefetch(next_track.path) if next_track: prefetch(next_track.path)
i += 1
if not extend: song_i += 1
while end_time >= time.monotonic() and pr.process.poll() is None: def loop(self):
start = time.monotonic() assert self.playlist_advisor
self.logger.info("Starting playback.")
[module.progress(song_i, track, time.monotonic() - pr.started_at, pr.duration, ttw) for module in simple_modules if module] try:
arg = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
elapsed = time.monotonic() - start if self.active_modifier: self.active_modifier.arguments(arg)
remaining_until_end = end_time - time.monotonic() while True:
if elapsed < 1 and remaining_until_end > 0: time.sleep(min(1 - elapsed, remaining_until_end)) if playlist := self.playlist_advisor.advise(arg): self.play_playlist(playlist)
if self.exit_pending: raise SystemExit(self.exit_status_code)
if next_track: prefetch(next_track.path) except Exception as e:
self.logger.critical_error(f"Unexpected error: {e}")
i += 1 raise
if not extend: song_i += 1
def main(): def main():
logger.info("Core is starting, loading modules") log_file_path = Path("/tmp/radioPlayer_log")
global playlist_advisor, active_modifier log_file_path.touch()
modules: list[tuple] = [] log_file = open(log_file_path, "w")
for file in MODULES_DIR.glob("*"):
if file.name.endswith(".py") and file.name != "__init__.py":
module_name = file.name[:-3]
full_module_name = f"{MODULES_PACKAGE}.{module_name}"
spec = importlib.util.spec_from_file_location(full_module_name, Path(MODULES_DIR, file)) core = RadioPlayer(log_file)
if not spec: continue atexit.register(core.shutdown)
module = importlib.util.module_from_spec(spec) core.start()
signal.signal(signal.SIGINT, core.handle_sigint)
sys.modules[full_module_name] = module try: core.loop()
finally: log_file.close()
if MODULES_PACKAGE not in sys.modules:
parent = types.ModuleType(MODULES_PACKAGE)
parent.__path__ = [str(MODULES_DIR)]
parent.__package__ = MODULES_PACKAGE
sys.modules[MODULES_PACKAGE] = parent
module.__package__ = MODULES_PACKAGE
module._log_file = log_file # type: ignore
module.__dict__['_log_file'] = log_file
modules.append((spec, module, module_name))
for (spec, module, module_name) in modules:
if not spec.loader: continue
try: spec.loader.exec_module(module)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed loading {module_name} due to {e}")
continue
if md := getattr(module, "module", None):
if isinstance(md, list): simple_modules.extend(md)
else: simple_modules.append(md)
if md := getattr(module, "playlistmod", None):
if isinstance(md, tuple):
md, index = md
if isinstance(md, list): playlist_modifier_modules[index:index] = md
else: playlist_modifier_modules.insert(index, md)
elif isinstance(md, list): playlist_modifier_modules.extend(md)
else: playlist_modifier_modules.append(md)
if md := getattr(module, "advisor", None):
if playlist_advisor: raise Exception("Multiple playlist advisors")
playlist_advisor = md
if md := getattr(module, "activemod", None):
if active_modifier: raise Exception("Multiple active modifiers")
active_modifier = md
if not playlist_advisor:
logger.critical_error("Playlist advisor was not found")
raise SystemExit(1)
InterModuleCommunication(simple_modules + [playlist_advisor, ProcmanCommunicator(procman), active_modifier])
logger.info("Starting playback.")
try:
arg = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
if active_modifier: active_modifier.arguments(arg)
while True:
if playlist := playlist_advisor.advise(arg):
logger.info(f"Advisor picked '{playlist}' to play")
play_playlist(playlist)
if exit_pending: raise SystemExit(exit_status_code)
except Exception as e:
logger.critical_error(f"Unexpected error: {e}")
raise
finally:
procman.stop_all()
log_file.close()