You've already forked RadioPlayer
mirror of
https://github.com/radio95-rnt/RadioPlayer.git
synced 2026-02-26 13:52:00 +01:00
something
This commit is contained in:
@@ -2,10 +2,11 @@ import log95
|
||||
from collections.abc import Sequence
|
||||
from subprocess import Popen
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
@dataclass
|
||||
class Track:
|
||||
path: str
|
||||
path: Path
|
||||
fade_out: bool
|
||||
fade_in: bool
|
||||
official: bool
|
||||
@@ -63,6 +64,8 @@ class ProcmanCommunicator(BaseIMCModule):
|
||||
return {"op": 2}
|
||||
elif int(op) == 3:
|
||||
return {"op": 3, "arg": self.procman.processes}
|
||||
elif int(op) == 4:
|
||||
return {"op": 4, "arg": self.procman.anything_playing()}
|
||||
|
||||
class PlayerModule(BaseIMCModule):
|
||||
"""
|
||||
@@ -97,11 +100,11 @@ class PlaylistAdvisor(BaseIMCModule):
|
||||
"""
|
||||
Only one of a playlist advisor can be loaded. This module picks the playlist file to play, this can be a scheduler or just a static file
|
||||
"""
|
||||
def advise(self, arguments: str | None) -> str | None:
|
||||
def advise(self, arguments: str | None) -> Path | None:
|
||||
"""
|
||||
Arguments are the arguments passed to the program on startup
|
||||
"""
|
||||
return "/path/to/playlist.txt"
|
||||
return Path("/path/to/playlist.txt")
|
||||
def new_playlist(self) -> bool:
|
||||
"""
|
||||
Whether to play a new playlist, if this is True, then the player will refresh and fetch a new playlist, calling advise
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
DEBUG = False
|
||||
import time
|
||||
import os, subprocess, importlib.util, types
|
||||
import sys, signal, threading, glob
|
||||
import libcache
|
||||
from pathlib import Path
|
||||
from modules import *
|
||||
|
||||
simple_modules: list[PlayerModule] = []
|
||||
@@ -13,10 +11,9 @@ playlist_advisor: PlaylistAdvisor | None = None
|
||||
active_modifier: ActiveModifier | None = None
|
||||
|
||||
MODULES_PACKAGE = "modules"
|
||||
MODULES_DIR = (Path(__file__).resolve().parent / MODULES_PACKAGE).resolve()
|
||||
MODULES_DIR = Path(__file__, "..", MODULES_PACKAGE).resolve()
|
||||
|
||||
logger_level = log95.log95Levels.DEBUG if DEBUG else log95.log95Levels.CRITICAL_ERROR
|
||||
logger = log95.log95("CORE", logger_level)
|
||||
logger = log95.log95("CORE")
|
||||
|
||||
exit_pending = False
|
||||
intr_time = 0
|
||||
@@ -27,20 +24,21 @@ class ProcessManager(Skeleton_ProcessManager):
|
||||
self.lock = threading.Lock()
|
||||
self.processes: list[Process] = []
|
||||
self.duration_cache = libcache.Cache([])
|
||||
def _get_audio_duration(self, file_path):
|
||||
if result := self.duration_cache.getElement(file_path, False): return result
|
||||
def _get_audio_duration(self, file_path: Path):
|
||||
if result := self.duration_cache.getElement(file_path.as_posix(), False): return result
|
||||
|
||||
result = subprocess.run(['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path], capture_output=True, text=True)
|
||||
result = subprocess.run(['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path)], capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
result = float(result.stdout.strip())
|
||||
self.duration_cache.saveElement(file_path, result, (60*60), False, True)
|
||||
self.duration_cache.saveElement(file_path.as_posix(), result, (60*60), False, True)
|
||||
return result
|
||||
return None
|
||||
def play(self, track: Track, fade_time: int=5) -> Process:
|
||||
cmd = ['ffplay', '-nodisp', '-hide_banner', '-autoexit', '-loglevel', 'quiet']
|
||||
assert track.path.exists()
|
||||
|
||||
duration = self._get_audio_duration(track.path)
|
||||
if not duration: raise Exception("Failed to get file duration, does it actually exist?", track.path)
|
||||
duration = self._get_audio_duration(track.path.absolute())
|
||||
if not duration: raise Exception("Failed to get file duration for", track.path)
|
||||
if track.offset >= duration: track.offset = max(duration - 0.1, 0)
|
||||
if track.offset > 0: cmd.extend(['-ss', str(track.offset)])
|
||||
|
||||
@@ -49,10 +47,9 @@ class ProcessManager(Skeleton_ProcessManager):
|
||||
if track.fade_out: filters.append(f"afade=t=out:st={duration - fade_time - track.offset}:d={fade_time}")
|
||||
if filters: cmd.extend(['-af', ",".join(filters)])
|
||||
|
||||
cmd.append(track.path)
|
||||
cmd.append(str(track.path.absolute()))
|
||||
|
||||
proc = Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
|
||||
pr = Process(proc, track.path, time.monotonic(), duration - track.offset)
|
||||
pr = Process(Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True), track.path.name, time.monotonic(), duration - track.offset)
|
||||
with self.lock: self.processes.append(pr)
|
||||
return pr
|
||||
def anything_playing(self) -> bool:
|
||||
@@ -89,30 +86,28 @@ def handle_sigint(signum, frame):
|
||||
raise SystemExit
|
||||
signal.signal(signal.SIGINT, handle_sigint)
|
||||
|
||||
def load_filelines(path):
|
||||
def load_filelines(path: Path):
|
||||
try:
|
||||
with open(path, 'r') as f: return [line.strip() for line in f.readlines() if line.strip()]
|
||||
return [line.strip() for line in path.read_text().splitlines() if line.strip()]
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Playlist not found: {path}")
|
||||
logger.error(f"Playlist not found: {path.name}")
|
||||
return []
|
||||
|
||||
def parse_playlistfile(playlist_path: str) -> tuple[dict[str, str], list[tuple[list[str], dict[str, str]]]]:
|
||||
parser_log = log95.log95("PARSER", logger_level)
|
||||
|
||||
parser_log.debug("Reading", playlist_path)
|
||||
lines = load_filelines(os.path.abspath(playlist_path))
|
||||
def parse_playlistfile(playlist_path: Path) -> tuple[dict[str, str], list[tuple[list[str], dict[str, str]]]]:
|
||||
lines = load_filelines(playlist_path.absolute())
|
||||
def check_for_imports(lines: list[str], seen=None) -> list[str]:
|
||||
nonlocal parser_log
|
||||
if seen is None: seen = set()
|
||||
out = []
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if line.startswith("@"):
|
||||
target = line.removeprefix("@")
|
||||
target = Path(line.removeprefix("@"))
|
||||
if target not in seen:
|
||||
parser_log.debug("Importing", target)
|
||||
if not target.exists():
|
||||
logger.error(f"Target {target.name} of {playlist_path.name} does not exist")
|
||||
continue
|
||||
seen.add(target)
|
||||
sub_lines = load_filelines(os.path.abspath(target))
|
||||
sub_lines = load_filelines(target)
|
||||
out.extend(check_for_imports(sub_lines, seen))
|
||||
else: out.append(line)
|
||||
return out
|
||||
@@ -136,12 +131,11 @@ def parse_playlistfile(playlist_path: str) -> tuple[dict[str, str], list[tuple[l
|
||||
for arg in args:
|
||||
key, val = arg.split("=", 1)
|
||||
arguments[key] = val
|
||||
parser_log.debug("Line:", line, "| Global Args:", repr(global_arguments), "| Local args:", repr(arguments))
|
||||
out.append(([f for f in glob.glob(line) if os.path.isfile(f)], arguments))
|
||||
return global_arguments, out
|
||||
|
||||
def play_playlist(playlist_path, starting_index: int = 0):
|
||||
if not playlist_advisor: raise Exception("No playlist advisor") # not sure how we would get this, but it makes pylance shut its fucking mouth
|
||||
assert playlist_advisor
|
||||
|
||||
try: global_args, parsed = parse_playlistfile(playlist_path)
|
||||
except Exception as e:
|
||||
@@ -150,7 +144,7 @@ def play_playlist(playlist_path, starting_index: int = 0):
|
||||
return
|
||||
|
||||
playlist: list[Track] = []
|
||||
[playlist.extend(Track(line, True, True, True, args) for line in lns) for (lns, args) in parsed] # i can read this, i think
|
||||
[playlist.extend(Track(Path(line).absolute(), True, True, True, args) for line in lns) for (lns, args) in parsed] # i can read this, i think
|
||||
|
||||
for module in playlist_modifier_modules: playlist = module.modify(global_args, playlist) or playlist
|
||||
|
||||
@@ -167,7 +161,7 @@ def play_playlist(playlist_path, starting_index: int = 0):
|
||||
if exit_pending:
|
||||
logger.info("Quit received, waiting for song end.")
|
||||
procman.wait_all()
|
||||
raise SystemExit()
|
||||
raise SystemExit
|
||||
elif return_pending:
|
||||
logger.info("Return reached, next song will reload the playlist.")
|
||||
procman.wait_all()
|
||||
@@ -178,20 +172,16 @@ def play_playlist(playlist_path, starting_index: int = 0):
|
||||
return_pending = True
|
||||
continue
|
||||
|
||||
old_track = playlist[song_i % len(playlist)]
|
||||
track = playlist[song_i % len(playlist)]
|
||||
if active_modifier:
|
||||
track, extend = active_modifier.play(song_i, old_track)
|
||||
track, extend = active_modifier.play(song_i, track)
|
||||
if track is None:
|
||||
song_i += 1
|
||||
continue
|
||||
if extend: max_iterator += 1
|
||||
else:
|
||||
extend = False
|
||||
track = old_track
|
||||
else: extend = False
|
||||
|
||||
track_path = os.path.abspath(os.path.expanduser(track.path))
|
||||
|
||||
logger.info(f"Now playing: {os.path.basename(track_path)}")
|
||||
logger.info(f"Now playing: {track.path.name}")
|
||||
|
||||
for module in simple_modules: module.on_new_track(song_i, track)
|
||||
|
||||
@@ -205,7 +195,7 @@ def play_playlist(playlist_path, starting_index: int = 0):
|
||||
while end_time >= time.monotonic() and pr.process.poll() is None:
|
||||
start = time.monotonic()
|
||||
|
||||
for module in simple_modules: module.progress(song_i, track, time.monotonic() - pr.started_at, pr.duration, ttw)
|
||||
[module.progress(song_i, track, time.monotonic() - pr.started_at, pr.duration, ttw) for module in simple_modules if module]
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
remaining_until_end = end_time - time.monotonic()
|
||||
@@ -217,13 +207,12 @@ def play_playlist(playlist_path, starting_index: int = 0):
|
||||
def main():
|
||||
logger.info("Core is starting, loading modules")
|
||||
global playlist_advisor, active_modifier
|
||||
for filename in os.listdir(MODULES_DIR):
|
||||
if filename.endswith(".py") and filename != "__init__.py":
|
||||
module_name = filename[:-3]
|
||||
module_path = MODULES_DIR / filename
|
||||
for file in MODULES_DIR.glob("*"):
|
||||
if file.name.endswith(".py") and file.name != "__init__.py":
|
||||
module_name = file.name[:-3]
|
||||
full_module_name = f"{MODULES_PACKAGE}.{module_name}"
|
||||
|
||||
spec = importlib.util.spec_from_file_location(full_module_name, module_path)
|
||||
spec = importlib.util.spec_from_file_location(full_module_name, Path(MODULES_DIR, file))
|
||||
if not spec: continue
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
|
||||
@@ -234,7 +223,6 @@ def main():
|
||||
parent.__path__ = [str(MODULES_DIR)]
|
||||
parent.__package__ = MODULES_PACKAGE
|
||||
sys.modules[MODULES_PACKAGE] = parent
|
||||
|
||||
module.__package__ = MODULES_PACKAGE
|
||||
|
||||
if not spec.loader: continue
|
||||
|
||||
Reference in New Issue
Block a user