0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 21:53:54 +01:00

massive clean ups (i wonder if it starts)

This commit is contained in:
Kuba
2025-10-31 18:04:37 +01:00
parent 2247e7996c
commit 1f4a184418
8 changed files with 130 additions and 84 deletions

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
DEBUG = False
import time
import os, subprocess, importlib.util
import os, subprocess, importlib.util, types
import sys, signal, threading, glob
import unidecode
from dataclasses import dataclass
@@ -14,10 +14,8 @@ playlist_modifier_modules: list[PlaylistModifierModule] = []
playlist_advisor: PlaylistAdvisor | None = None
active_modifier: ActiveModifier | None = None
SCRIPT_DIR = Path(__file__).resolve().parent
MODULES_PACKAGE = "modules"
MODULES_DIR = SCRIPT_DIR / MODULES_PACKAGE
MODULES_DIR = MODULES_DIR.resolve()
MODULES_DIR = (Path(__file__).resolve().parent / MODULES_PACKAGE).resolve()
def print_wait(ttw: float, frequency: float, duration: float=-1, prefix: str="", bias: float = 0):
interval = 1.0 / frequency
@@ -101,11 +99,10 @@ procman = ProcessManager()
def handle_sigint(signum, frame):
global exit_pending, intr_time
logger.info("Received SIGINT")
if (time.time() - intr_time) > 10:
if (time.time() - intr_time) > 5:
intr_time = time.time()
logger.info("Will quit on song end.")
exit_pending = True
return
else:
logger.warning("Force-Quit pending")
procman.stop_all()
@@ -120,7 +117,7 @@ def load_filelines(path):
logger.error(f"Playlist not found: {path}")
return []
def parse_playlistfile(playlist_path: str):
def parse_playlistfile(playlist_path: str) -> tuple[dict[str, str], list[tuple[list[str], dict[str, str]]]]:
parser_log = log95.log95("PARSER", logger_level)
parser_log.debug("Reading", playlist_path)
@@ -165,18 +162,15 @@ def parse_playlistfile(playlist_path: str):
def play_playlist(playlist_path):
if not playlist_advisor: raise Exception("No playlist advisor")
try:
global_args, parsed = parse_playlistfile(playlist_path)
try: global_args, parsed = parse_playlistfile(playlist_path)
except Exception:
logger.info(f"Exception while parsing playlist, retrying in 15 seconds...")
time.sleep(15)
return
playlist: list[tuple[str, bool, bool, bool, dict]] = [] # name, fade in, fade out, official, args
playlist: list[Track] = []
for (lns, args) in parsed:
lns: list[str]
args: dict[str, str]
for line in lns: playlist.append((line, True, True, True, args)) # simple entry, just to convert to a format taken by the modules
for line in lns: playlist.append(Track(line, True, True, True, args)) # simple entry, just to convert to a format taken by the modules
for module in playlist_modifier_modules: playlist = module.modify(global_args, playlist)
for module in simple_modules: module.on_new_playlist(playlist)
@@ -187,8 +181,7 @@ def play_playlist(playlist_path):
cross_fade = int(global_args.get("crossfade", 5))
max_iterator = len(playlist)
i = 0
song_i = 0
song_i = i = 0
while i < max_iterator:
if exit_pending:
@@ -199,37 +192,36 @@ def play_playlist(playlist_path):
logger.info("Return reached, next song will reload the playlist.")
procman.wait_all()
return
if playlist_advisor.new_playlist():
logger.info("Reloading now...")
return_pending = True
continue
old_track_tuple = playlist[song_i % len(playlist)]
old_track = playlist[song_i % len(playlist)]
if active_modifier:
track_tuple, extend = active_modifier.play(song_i, old_track_tuple)
if track_tuple is None:
track, extend = active_modifier.play(song_i, old_track)
if track is None:
song_i += 1
continue
if extend: max_iterator += 1
else:
extend = False
track_tuple = old_track_tuple
track, to_fade_in, to_fade_out, official, args = track_tuple
track = old_track
track_path = os.path.abspath(os.path.expanduser(track))
track_path = os.path.abspath(os.path.expanduser(track.path))
track_name = os.path.basename(track_path)
for module in simple_modules: module.on_new_track(song_i, track_path, to_fade_in, to_fade_out, official)
logger.info(f"Now playing: {track_name}")
pr = procman.play(track_path, to_fade_in, to_fade_out, cross_fade)
for module in simple_modules: module.on_new_track(song_i, track)
pr = procman.play(track_path, track.fade_in, track.fade_out, cross_fade)
ttw = pr.duration
if to_fade_out: ttw -= cross_fade
if track.fade_out: ttw -= cross_fade
if official: print_wait(ttw, 1, pr.duration, f"{track_name}: ")
if track.official: print_wait(ttw, 1, pr.duration, f"{track_name}: ")
else: time.sleep(ttw)
i += 1
@@ -250,7 +242,6 @@ def main():
sys.modules[full_module_name] = module
if MODULES_PACKAGE not in sys.modules:
import types
parent = types.ModuleType(MODULES_PACKAGE)
parent.__path__ = [str(MODULES_DIR)]
parent.__package__ = MODULES_PACKAGE
@@ -281,9 +272,8 @@ def main():
if not playlist_advisor:
logger.critical_error("Playlist advisor was not found")
exit(1)
imc = InterModuleCommunication(playlist_advisor, active_modifier, simple_modules)
imc = InterModuleCommunication(playlist_advisor, active_modifier, simple_modules)
playlist_advisor.imc(imc)
if active_modifier: active_modifier.imc(imc)
for module in simple_modules: module.imc(imc)
@@ -292,9 +282,9 @@ def main():
arg = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
if active_modifier: active_modifier.arguments(arg)
while True:
playlist = playlist_advisor.advise(arg)
logger.info(f"Advisor picked '{playlist}' to play")
play_playlist(playlist)
if playlist := playlist_advisor.advise(arg):
logger.info(f"Advisor picked '{playlist}' to play")
play_playlist(playlist)
if exit_pending: exit()
except Exception as e:
logger.error(f"Unexpected error: {e}")