0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 21:53:54 +01:00
Files
RadioPlayer/radioPlayer.py
2025-10-11 15:54:28 +02:00

457 lines
18 KiB
Python

#!/usr/bin/env python3
DEBUG = False
import time, datetime
import os, subprocess, importlib.util
import sys, signal, threading, glob
import re, unidecode
import random
import socket
from dataclasses import dataclass
import log95, copy
from pathlib import Path
from player_modules import PlayerModule
simple_modules: list[PlayerModule] = []
SCRIPT_DIR = Path(__file__).resolve().parent
MODULES_DIR = SCRIPT_DIR / ".." / "modules"
MODULES_DIR = MODULES_DIR.resolve()
def print_wait(ttw: float, frequency: float, duration: float=-1, prefix: str="", bias: float = 0):
interval = 1.0 / frequency
elapsed = 0.0
if duration == -1: duration = ttw
def format_time(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
try:
while elapsed < ttw:
print(f"{prefix}{format_time(elapsed+bias)} / {format_time(duration)}", end="\r", flush=True)
time.sleep(interval)
elapsed += interval
except Exception:
print()
raise
print(f"{prefix}{format_time(ttw+bias)} / {format_time(duration)}")
MORNING_START = 5
MORNING_END = 11
DAY_START = 11
DAY_END = 19
LATE_NIGHT_START = 0
LATE_NIGHT_END = 5
JINGIEL_FILE = "/home/user/Jingiel.mp3"
playlist_dir = "/home/user/playlists"
name_table_path = "/home/user/mixes/name_table.txt"
rds_base = "Gramy: {} - {}"
rds_default_artist = "radio95"
rds_default_name = "Program Godzinny"
udp_host = ("127.0.0.1", 5000)
logger_level = log95.log95Levels.DEBUG if DEBUG else log95.log95Levels.CRITICAL_ERROR
logger = log95.log95("radioPlayer", logger_level)
exit_pending = False
reload_pending = False
intr_time = 0
class Time:
@staticmethod
def get_day_hour(): return datetime.datetime.now().strftime('%A').lower(), datetime.datetime.now().hour
@staticmethod
def get_playlist_modification_time(playlist_path):
try: return os.path.getmtime(playlist_path)
except OSError: return 0
@dataclass
class Process:
process: subprocess.Popen
track: str
started_at: float
duration: float
class ProcessManager:
def __init__(self) -> None:
self.lock = threading.Lock()
self.processes: list[Process] = []
def _get_audio_duration(self, file_path):
result = subprocess.run(['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path], capture_output=True, text=True)
if result.returncode == 0: return float(result.stdout.strip())
return None
def play(self, track_path: str, fade_in: bool=False, fade_out: bool=False, fade_time: int = 5) -> Process:
cmd = ['ffplay', '-nodisp', '-hide_banner', '-autoexit', '-loglevel', 'quiet']
duration = self._get_audio_duration(track_path)
if not duration: raise Exception("Failed to get file duration, does it actually exit?", track_path)
filters = []
if fade_in: filters.append(f"afade=t=in:st=0:d={fade_time}")
if fade_out: filters.append(f"afade=t=out:st={duration-fade_time}:d={fade_time}")
if filters: cmd.extend(['-af', ",".join(filters)])
cmd.append(track_path)
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
pr = Process(proc, track_path, time.time(), duration)
with self.lock: self.processes.append(pr)
return pr
def anything_playing(self) -> bool:
with self.lock:
for process in self.processes:
if process.process.poll() is not None: self.processes.remove(process)
return bool(self.processes)
def stop_all(self, timeout: float | None = None) -> None:
with self.lock:
for process in self.processes:
process.process.terminate()
try: process.process.wait(timeout)
except: process.process.kill()
self.processes.remove(process)
def wait_all(self, timeout: float | None = None) -> None:
with self.lock:
for process in self.processes:
process.process.wait(timeout)
self.processes.remove(process)
procman = ProcessManager()
def handle_sigint(signum, frame):
global exit_pending, intr_time
logger.info("Received SIGINT")
if (time.time() - intr_time) > 10:
intr_time = time.time()
logger.info("Will quit on song end.")
exit_pending = True
return
else:
logger.warning("Force-Quit pending")
procman.stop_all()
exit(0)
def handle_sighup(signum, frame):
global reload_pending
reload_pending = True
signal.signal(signal.SIGINT, handle_sigint)
signal.signal(signal.SIGHUP, handle_sighup) # type: ignore
def load_dict_from_custom_format(file_path: str) -> dict[str, str]:
try:
result_dict = {}
with open(file_path, 'r') as file:
for line in file:
if line.strip() == "" or line.startswith(";"): continue
key, value = line.split(':', 1)
result_dict[key.strip()] = value.strip()
return result_dict
except FileNotFoundError:
logger.error(f"{name_table_path} does not exist, or could not be accesed")
return {}
def process_for_rds(track_name: str):
name_table = load_dict_from_custom_format(name_table_path)
try:
name = name_table[track_name]
has_name = True
except KeyError:
has_name = False
name = track_name.rsplit(".", 1)[0]
name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name)
if " - " in name:
count = name.count(" - ")
while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal
name = name.split(" - ", 1)[1]
count = name.count(" - ")
artist = name.split(" - ", 1)[0]
title = name.split(" - ", 1)[1]
else:
artist = rds_default_artist
title = name
if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})")
title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk
prt = rds_base.format(artist, title)
rtp = [4] # type 1
rtp.append(prt.find(artist)) # start 1
rtp.append(len(artist)) # len 1
rtp.append(1) # type 2
rtp.append(prt.find(title)) # start 2
rtp.append(len(title) - 1) # len 2
return prt, ','.join(list(map(str, rtp)))
def update_rds(prt: str, rtp: str):
try:
f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
f.settimeout(1.0)
f.sendto(f"TEXT={prt}\r\nRTP={rtp}\r\n".encode(), udp_host)
f.close()
except Exception as e: logger.error(f"Error updating RDS: {e}")
def load_filelines(path):
try:
with open(path, 'r') as f: return [unidecode.unidecode(line.strip()) for line in f.readlines() if unidecode.unidecode(line.strip())]
except FileNotFoundError:
logger.error(f"Playlist not found: {path}")
return []
def check_if_playlist_modifed(playlist_path: str, custom_playlist: bool = False):
current_day, current_hour = Time.get_day_hour()
morning_playlist_path = os.path.join(playlist_dir, current_day, 'morning')
day_playlist_path = os.path.join(playlist_dir, current_day, 'day')
night_playlist_path = os.path.join(playlist_dir, current_day, 'night')
late_night_playlist_path = os.path.join(playlist_dir, current_day, 'late_night')
if DAY_START <= current_hour < DAY_END and not custom_playlist:
if playlist_path != day_playlist_path:
logger.info("Time changed to day hours, switching playlist...")
return True
elif MORNING_START <= current_hour < MORNING_END and not custom_playlist:
if playlist_path != morning_playlist_path:
logger.info("Time changed to morning hours, switching playlist...")
return True
elif LATE_NIGHT_START <= current_hour < LATE_NIGHT_END and not custom_playlist:
if playlist_path != late_night_playlist_path:
logger.info("Time changed to late night hours, switching playlist...")
return True
else:
if playlist_path != night_playlist_path and not custom_playlist:
logger.info("Time changed to night hours, switching playlist...")
return True
def parse_playlistfile(playlist_path: str):
parser_log = log95.log95("PARSER", logger_level)
parser_log.debug("Reading", playlist_path)
lines = load_filelines(playlist_path)
def check_for_imports(lines: list[str], seen=None) -> list[str]:
nonlocal parser_log
if seen is None: seen = set()
out = []
for line in lines:
if line.startswith("@"):
target = line.removeprefix("@")
if target not in seen:
parser_log.debug("Importing", target)
seen.add(target)
sub_lines = load_filelines(target)
out.extend(check_for_imports(sub_lines, seen))
else: out.append(line)
return out
lines = check_for_imports(lines) # First, import everything
out = []
global_arguments = {}
for line in lines:
arguments = {}
if line.startswith(";") or not line.strip(): continue
if "|" in line:
if line.startswith("|"): # No file name, we're defining global arguments
args = line.removeprefix("|").split(";")
for arg in args:
key, val = arg.split("=", 1)
global_arguments[key] = val
else:
line, args = line.split("|", 1)
args = args.split(";")
for arg in args:
key, val = arg.split("=", 1)
arguments[key] = val
parser_log.debug("Line:", line, "| Global Args:", repr(global_arguments), "| Local args:", repr(arguments))
out.append(([f for f in glob.glob(line) if os.path.isfile(f)], arguments))
return global_arguments, out
def play_playlist(playlist_path, custom_playlist: bool=False):
last_modified_time = Time.get_playlist_modification_time(playlist_path)
try:
global_args, parsed = parse_playlistfile(playlist_path)
except Exception:
logger.info(f"Exception while parsing playlist, retrying in 15 seconds...")
time.sleep(15)
return
lines_args = copy.deepcopy(parsed)
lines = []
for (lns, args) in lines_args:
lns: list[str]
args: dict[str, str]
for i in range(int(args.get("multiplier", 1))): lines.extend(lns)
cross_fade = int(global_args.get("crossfade", 5))
if int(global_args.get("no_shuffle", 0)) == 0: random.shuffle(lines)
playlist: list[tuple[str, bool, bool, bool]] = [] # name, fade in, fade out, official
last_jingiel = True
for line in lines:
if not last_jingiel and random.choice([False, True, False, False]) and JINGIEL_FILE:
playlist.append((line, True, False, True))
playlist.append((JINGIEL_FILE, False, False, False))
last_jingiel = True
else:
playlist.append((line, True, True, True))
last_jingiel = False
del last_jingiel
simple_playlist = [t[0] for t in playlist]
for module in simple_modules: module.on_new_playlist(simple_playlist)
return_pending = False
for i, (track, to_fade_in, to_fade_out, official) in enumerate(playlist):
if return_pending:
logger.info("Return reached, next song will reload the playlist.")
procman.wait_all()
return
elif exit_pending:
logger.info("Quit received, waiting for song end.")
procman.wait_all()
exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments after song ending")
procman.wait_all()
return "reload"
track_path = os.path.abspath(os.path.expanduser(track))
for module in simple_modules: module.on_new_track(track_path, i)
track_name = os.path.basename(track_path)
current_modified_time = Time.get_playlist_modification_time(playlist_path)
if current_modified_time > last_modified_time:
logger.info(f"Playlist {playlist_path} has been modified, reloading...")
return_pending = True
continue
return_pending = check_if_playlist_modifed(playlist_path, custom_playlist)
if return_pending and not procman.anything_playing(): continue
logger.info(f"Now playing: {track_name}")
if official:
rds_rt, rds_rtp = process_for_rds(track_name)
update_rds(rds_rt, rds_rtp)
logger.info(f"RT set to '{rds_rt}' (RTP: {rds_rtp})")
if (i + 1) < len(playlist): logger.info(f"Next up: {os.path.basename(playlist[i+1][0])}")
pr = procman.play(track_path, to_fade_in, to_fade_out)
ttw = pr.duration
if to_fade_out: ttw -= cross_fade
if official: print_wait(ttw, 1, pr.duration, f"{track_name}: ")
else: time.sleep(ttw)
def can_delete_file(filepath):
if not os.path.isfile(filepath): return False
directory = os.path.dirname(os.path.abspath(filepath)) or '.'
return os.access(directory, os.W_OK | os.X_OK)
def parse_arguments():
"""Parse command line arguments and return configuration"""
arg = sys.argv[1] if len(sys.argv) > 1 else None
selected_list = None
if arg:
if arg.lower() == "-h":
print("Control files:")
print(" Note: All of these files are one-time only, after they have been acked by the player they will be deleted")
print(" /tmp/radioPlayer_arg - Contains arguments to use")
print()
print("Arguments:")
print(" list:playlist;options - Play custom playlist with options")
print()
exit(0)
if can_delete_file("/tmp/radioPlayer_arg"):
with open("/tmp/radioPlayer_arg", "r") as f: arg = f.read().strip()
os.remove("/tmp/radioPlayer_arg")
if arg:
if arg.startswith("list:"):
selected_list = arg.removeprefix("list:")
logger.info(f"The list {selected_list.split(';')[0]} will be played instead of the daily section lists.")
else: logger.error(f"Invalid argument or file not found: {arg}")
return selected_list
def main():
for filename in os.listdir(MODULES_DIR):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3]
module_path = MODULES_DIR / filename
# Load module from file path directly
spec = importlib.util.spec_from_file_location(module_name, module_path)
if not spec: continue
module = importlib.util.module_from_spec(spec)
if not spec.loader: continue
spec.loader.exec_module(module)
if md := getattr(module, "module", None):
simple_modules.append(md)
try:
while True:
selected_list = parse_arguments()
play_loop = True
while play_loop:
if selected_list:
logger.info("Playing custom list")
result = play_playlist(selected_list, True)
if result == "reload": play_loop = False
continue
current_day, current_hour = Time.get_day_hour()
morning_playlist = os.path.join(playlist_dir, current_day, 'morning')
day_playlist = os.path.join(playlist_dir, current_day, 'day')
night_playlist = os.path.join(playlist_dir, current_day, 'night')
late_night_playlist = os.path.join(playlist_dir, current_day, 'late_night')
morning_dir = os.path.dirname(morning_playlist)
day_dir = os.path.dirname(day_playlist)
night_dir = os.path.dirname(night_playlist)
late_night_dir = os.path.dirname(late_night_playlist)
for dir_path in [morning_dir, day_dir, night_dir, late_night_dir]:
if not os.path.exists(dir_path):
logger.info(f"Creating directory: {dir_path}")
os.makedirs(dir_path, exist_ok=True)
for playlist_path in [morning_playlist, day_playlist, night_playlist, late_night_playlist]:
if not os.path.exists(playlist_path):
logger.info(f"Creating empty playlist: {playlist_path}")
with open(playlist_path, 'w'): pass
if DAY_START <= current_hour < DAY_END:
logger.info(f"Playing {current_day} day playlist...")
result = play_playlist(day_playlist, False)
elif MORNING_START <= current_hour < MORNING_END:
logger.info(f"Playing {current_day} morning playlist...")
result = play_playlist(morning_playlist, False)
elif LATE_NIGHT_START <= current_hour < LATE_NIGHT_END:
logger.info(f"Playing {current_day} late_night playlist...")
result = play_playlist(late_night_playlist, False)
else:
logger.info(f"Playing {current_day} night playlist...")
result = play_playlist(night_playlist, False)
if exit_pending: exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments...")
result = "reload"
if result == "reload": play_loop = False
except Exception as e:
logger.error(f"Unexpected error: {e}")
procman.stop_all()
raise
finally: procman.stop_all()