0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 21:53:54 +01:00
Files
RadioPlayer/radioPlayer.py
2025-09-10 20:58:43 +02:00

426 lines
16 KiB
Python

#!/usr/bin/env python3
import os, socket
import random
import subprocess
import time, datetime
import sys, signal
import threading
import re, unidecode
from dataclasses import dataclass
from datetime import datetime
import log95
def print_wait(ttw: float, frequency: float, duration: float=-1, suffix: str="", bias: float = 0):
interval = 1.0 / frequency
elapsed = 0.0
if duration == -1: duration = ttw
def format_time(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
try:
while elapsed < ttw:
print(f"{suffix}{format_time(elapsed+bias)} / {format_time(duration)}", end="\r")
time.sleep(interval)
elapsed += interval
except KeyboardInterrupt:
print()
raise
print(f"{suffix}{format_time(ttw+bias)} / {format_time(duration)}")
MORNING_START = 6
MORNING_END = 10
DAY_START = 10
DAY_END = 20
LATE_NIGHT_START = 0
LATE_NIGHT_END = 6
CROSSFADE_DURATION = 3 # seconds
playlist_dir = "/home/user/playlists"
name_table_path = "/home/user/mixes/name_table.txt"
rds_base = "Gramy: {} - {}"
rds_default_artist = "radio95"
rds_default_name = "Program Godzinny"
udp_host = ("127.0.0.1", 5000)
logger = log95.log95("radioPlayer")
exit_pending = False
reload_pending = False
intr_time = 0
class Time:
@staticmethod
def get_day_hour(): return datetime.now().strftime('%A').lower(), datetime.now().hour
@staticmethod
def get_playlist_modification_time(playlist_path):
try: return os.path.getmtime(playlist_path)
except OSError: return 0
@dataclass
class Process:
process: subprocess.Popen
track: str
started_at: float
duration: float
class ProcessManager:
def __init__(self) -> None:
self.lock = threading.Lock()
self.processes: list[Process] = []
def _get_audio_duration(self, file_path):
result = subprocess.run(['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path], capture_output=True, text=True)
if result.returncode == 0: return float(result.stdout.strip())
return None
def play(self, track_path: str, fade_in: bool=False, fade_out: bool=False) -> Process:
cmd = ['ffplay', '-nodisp', '-hide_banner', '-autoexit', '-loglevel', 'quiet']
duration = self._get_audio_duration(track_path)
if not duration: raise Exception("Failed to get file duration, does it actually exit?", track_path)
filters = []
if fade_in: filters.append(f"afade=t=in:st=0:d={CROSSFADE_DURATION}")
if fade_out: filters.append(f"afade=t=out:st={duration-CROSSFADE_DURATION}:d={CROSSFADE_DURATION}")
if filters: cmd.extend(['-af', ",".join(filters)])
cmd.append(track_path)
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
pr = Process(proc, track_path, time.time(), duration)
with self.lock: self.processes.append(pr)
return pr
def anything_playing(self) -> bool:
with self.lock:
for process in self.processes[:]:
if process.process.poll() is not None: self.processes.remove(process)
return bool(self.processes)
def stop_all(self, timeout: float | None = 2) -> bool:
success = True
with self.lock:
for process in self.processes:
process.process.terminate()
try: process.process.wait(timeout)
except:
success = False
continue
self.processes.remove(process)
return success
def wait_all(self, timeout: float | None = None) -> None:
with self.lock:
for process in self.processes:
process.process.wait(timeout)
self.processes.remove(process)
procman = ProcessManager()
def handle_sigint(signum, frame):
global exit_pending, intr_time
logger.info("Received SIGINT")
if (time.time() - intr_time) > 10:
intr_time = time.time()
logger.info("Will quit on song end.")
exit_pending = True
return
else:
logger.warning("Force-Quit pending")
procman.stop_all(None)
exit(0)
def handle_sighup(signum, frame):
global reload_pending
reload_pending = True
signal.signal(signal.SIGINT, handle_sigint)
signal.signal(signal.SIGHUP, handle_sighup) # type: ignore
def load_dict_from_custom_format(file_path: str) -> dict[str, str]:
try:
result_dict = {}
with open(file_path, 'r') as file:
for line in file:
if line.strip() == "" or line.startswith(";"): continue
key, value = line.split(':', 1)
result_dict[key.strip()] = value.strip()
return result_dict
except FileNotFoundError:
logger.error(f"{name_table_path} does not exist, or could not be accesed")
return {}
def update_rds(track_name: str):
try:
name_table = load_dict_from_custom_format(name_table_path)
try:
name = name_table[track_name]
has_name = True
except KeyError as e:
has_name = False
name = track_name.rsplit(".", 1)[0]
name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name)
if " - " in name:
count = name.count(" - ")
while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal
name = name.split(" - ", 1)[1]
count = name.count(" - ")
artist = name.split(" - ", 1)[0]
title = name.split(" - ", 1)[1]
else:
artist = rds_default_artist
title = name
if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})")
title = unidecode.unidecode(title)
artist = unidecode.unidecode(artist)
title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk
prt = rds_base.format(artist, title)
f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
f.settimeout(1.0)
f.sendto(f"TEXT={prt}\r\n".encode(), udp_host)
logger.info("RT set to", prt)
rtp = [4] # type 1
rtp.append(prt.find(artist)) # start 1
rtp.append(len(artist)) # len 1
rtp.append(1) # type 2
rtp.append(prt.find(title)) # start 2
rtp.append(len(title)) # len 2
f.sendto(f"RTP={','.join(list(map(str, rtp)))}\r\n".encode(), udp_host)
f.close()
except Exception as e: logger.error(f"Error updating RDS: {e}")
def load_playlist(playlist_path):
try:
with open(playlist_path, 'r') as f: return [line.strip() for line in f.readlines() if line.strip()]
except FileNotFoundError:
logger.error(f"Playlist not found: {playlist_path}")
return []
def get_newest_track(tracks):
if not tracks: return None
newest_track = None
newest_time = 0
for track in tracks:
track_path = os.path.abspath(os.path.expanduser(track))
try:
mod_time = os.path.getmtime(track_path)
if mod_time > newest_time:
newest_time = mod_time
newest_track = track
except OSError: continue
return newest_track
def check_if_playlist_modifed(playlist_path: str, custom_playlist: bool = False):
current_day, current_hour = Time.get_day_hour()
morning_playlist_path = os.path.join(playlist_dir, current_day, 'morning')
day_playlist_path = os.path.join(playlist_dir, current_day, 'day')
night_playlist_path = os.path.join(playlist_dir, current_day, 'night')
late_night_playlist_path = os.path.join(playlist_dir, current_day, 'late_night')
if DAY_START <= current_hour < DAY_END and not custom_playlist:
if playlist_path != day_playlist_path:
logger.info("Time changed to day hours, switching playlist...")
return True
elif MORNING_START <= current_hour < MORNING_END and not custom_playlist:
if playlist_path != morning_playlist_path:
logger.info("Time changed to morning hours, switching playlist...")
return True
elif LATE_NIGHT_START <= current_hour < LATE_NIGHT_END and not custom_playlist:
if playlist_path != late_night_playlist_path:
logger.info("Time changed to late night hours, switching playlist...")
return True
else:
if playlist_path != night_playlist_path and not custom_playlist:
logger.info("Time changed to night hours, switching playlist...")
return True
def play_playlist(playlist_path, custom_playlist: bool=False, play_newest_first=False, do_shuffle=True):
last_modified_time = Time.get_playlist_modification_time(playlist_path)
tracks = load_playlist(playlist_path)
if not tracks:
logger.info(f"No tracks found in {playlist_path}, checking again in 15 seconds...")
time.sleep(15)
return
if do_shuffle: random.seed()
start_index = 0
# Normal playlist preparation
if play_newest_first:
newest_track = get_newest_track(tracks)
if newest_track:
logger.info(f"Playing newest track first: {os.path.basename(newest_track)}")
tracks.remove(newest_track)
if do_shuffle: random.shuffle(tracks)
tracks.insert(0, newest_track)
else:
if do_shuffle: random.shuffle(tracks)
return_pending = False
for i, track in enumerate(tracks[start_index:], start_index):
if return_pending:
logger.info("Return reached, next song will reload the playlist.")
procman.wait_all()
return
elif exit_pending:
logger.info("Quit received, waiting for song end.")
procman.wait_all()
exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments...")
procman.wait_all()
return "reload"
track_path = os.path.abspath(os.path.expanduser(track))
track_name = os.path.basename(track_path)
current_modified_time = Time.get_playlist_modification_time(playlist_path)
if current_modified_time > last_modified_time:
logger.info(f"Playlist {playlist_path} has been modified, reloading...")
return_pending = True
continue
return_pending = check_if_playlist_modifed(playlist_path, custom_playlist)
if return_pending and not procman.anything_playing(): continue
logger.info(f"Now playing: {track_name}")
update_rds(track_name)
pr = procman.play(track_path, True, True)
print_wait(pr.duration - CROSSFADE_DURATION, 1, pr.duration, f"{track_name}: ")
def can_delete_file(filepath):
if not os.path.isfile(filepath): return False
directory = os.path.dirname(os.path.abspath(filepath)) or '.'
return os.access(directory, os.W_OK | os.X_OK)
def parse_arguments():
"""Parse command line arguments and return configuration"""
arg = sys.argv[1] if len(sys.argv) > 1 else None
play_newest_first = False
do_shuffle = True
pre_track_path = None
selected_list = None
if arg:
if arg.lower() == "-h":
print("Control files:")
print(" Note: All of these files are one-time only, after they have been acked by the player they will be deleted")
print(" /tmp/radioPlayer_arg - Contains arguments to use")
print()
print("Arguments:")
print(" n - Play newest song first")
print(" list:playlist;options - Play custom playlist with options")
print(" /path/to/file - Play specific file first")
print()
print(f"Crossfade: {CROSSFADE_DURATION}-second crossfade is automatically applied between tracks")
exit(0)
if can_delete_file("/tmp/radioPlayer_arg"):
with open("/tmp/radioPlayer_arg", "r") as f: arg = f.read().strip()
os.remove("/tmp/radioPlayer_arg")
if arg:
if arg.lower() == "n":
play_newest_first = True
logger.info("Newest song will be played first")
elif arg.startswith("list:"):
selected_list = arg.removeprefix("list:")
logger.info(f"The list {selected_list.split(';')[0]} will be played instead of the daily section lists.")
for option in selected_list.split(";"):
if option == "n": play_newest_first = True
elif option == "ns": do_shuffle = False
selected_list = selected_list.split(";")[0]
elif os.path.isfile(arg):
pre_track_path = arg
logger.info(f"Will play requested song first: {arg}")
else: logger.error(f"Invalid argument or file not found: {arg}")
return play_newest_first, do_shuffle, pre_track_path, selected_list
def main():
try:
while True:
play_newest_first, do_shuffle, pre_track_path, selected_list = parse_arguments()
if pre_track_path:
track_name = os.path.basename(pre_track_path)
logger.info(f"Now playing: {track_name}")
update_rds(track_name)
procman.play(pre_track_path).process.wait()
if exit_pending:
exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments...")
continue # Restart the main loop
play_loop = True
while play_loop:
if selected_list:
logger.info("Playing custom list")
result = play_playlist(selected_list, True, play_newest_first, do_shuffle)
if result == "reload": play_loop = False
continue
current_day, current_hour = Time.get_day_hour()
morning_playlist = os.path.join(playlist_dir, current_day, 'morning')
day_playlist = os.path.join(playlist_dir, current_day, 'day')
night_playlist = os.path.join(playlist_dir, current_day, 'night')
late_night_playlist = os.path.join(playlist_dir, current_day, 'late_night')
morning_dir = os.path.dirname(morning_playlist)
day_dir = os.path.dirname(day_playlist)
night_dir = os.path.dirname(night_playlist)
late_night_dir = os.path.dirname(late_night_playlist)
for dir_path in [morning_dir, day_dir, night_dir, late_night_dir]:
if not os.path.exists(dir_path):
logger.info(f"Creating directory: {dir_path}")
os.makedirs(dir_path, exist_ok=True)
for playlist_path in [morning_playlist, day_playlist, night_playlist, late_night_playlist]:
if not os.path.exists(playlist_path):
logger.info(f"Creating empty playlist: {playlist_path}")
with open(playlist_path, 'w'): pass
if DAY_START <= current_hour < DAY_END:
logger.info(f"Playing {current_day} day playlist...")
result = play_playlist(day_playlist, False, play_newest_first, do_shuffle)
elif MORNING_START <= current_hour < MORNING_END:
logger.info(f"Playing {current_day} morning playlist...")
result = play_playlist(morning_playlist, False, play_newest_first, do_shuffle)
elif LATE_NIGHT_START <= current_hour < LATE_NIGHT_END:
logger.info(f"Playing {current_day} late_night playlist...")
result = play_playlist(late_night_playlist, False, play_newest_first, do_shuffle)
else:
logger.info(f"Playing {current_day} night playlist...")
result = play_playlist(night_playlist, False, play_newest_first, do_shuffle)
if exit_pending: exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments...")
result = "reload"
if result == "reload": play_loop = False
except Exception as e:
logger.error(f"Unexpected error: {e}")
procman.stop_all()
raise
finally: procman.stop_all()