diff --git a/modules/rds.py b/modules/rds.py new file mode 100644 index 0000000..85a87ea --- /dev/null +++ b/modules/rds.py @@ -0,0 +1,80 @@ +from typing import TYPE_CHECKING +if TYPE_CHECKING: + class PlayerModule: + def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]): + pass + def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool): + pass + +import socket, re, log95, os + +name_table_path = "/home/user/mixes/name_table.txt" + +rds_base = "Gramy: {} - {}" +rds_default_artist = "radio95" +rds_default_name = "Program Godzinny" + +udp_host = ("127.0.0.1", 5000) + +logger = log95.log95("RDS-MODULE") + +def load_dict_from_custom_format(file_path: str) -> dict[str, str]: + try: + result_dict = {} + with open(file_path, 'r') as file: + for line in file: + if line.strip() == "" or line.startswith(";"): continue + key, value = line.split(':', 1) + result_dict[key.strip()] = value.strip() + return result_dict + except FileNotFoundError: + logger.error(f"{name_table_path} does not exist, or could not be accesed") + return {} + +def update_rds(track_name: str): + name_table = load_dict_from_custom_format(name_table_path) + try: + name = name_table[track_name] + has_name = True + except KeyError: + has_name = False + name = track_name.rsplit(".", 1)[0] + + name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name) + + if " - " in name: + count = name.count(" - ") + while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal + name = name.split(" - ", 1)[1] + count = name.count(" - ") + artist = name.split(" - ", 1)[0] + title = name.split(" - ", 1)[1] + else: + artist = rds_default_artist + title = name + if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})") + + title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk + + prt = rds_base.format(artist, title) + rtp = [4] # type 1 + rtp.append(prt.find(artist)) # start 1 + rtp.append(len(artist)) # len 1 + rtp.append(1) # type 2 + rtp.append(prt.find(title)) # start 2 + rtp.append(len(title) - 1) # len 2 + + try: + f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + f.settimeout(1.0) + f.sendto(f"TEXT={prt}\r\nRTP={rtp}\r\n".encode(), udp_host) + f.close() + except Exception as e: logger.error(f"Error updating RDS: {e}") + + return prt, ','.join(list(map(str, rtp))) + +class Module(PlayerModule): + def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool): + if official: + rds_rt, rds_rtp = update_rds(os.path.basename(track)) + logger.info(f"RT set to '{rds_rt}' (RTP: {rds_rtp})") \ No newline at end of file diff --git a/modules/write_playlists.py b/modules/write_playlists.py index 4ad3592..003e2de 100644 --- a/modules/write_playlists.py +++ b/modules/write_playlists.py @@ -1,25 +1,23 @@ -import sys -from pathlib import Path - -sys.path.insert(0, str(Path(__file__).parent.parent)) - -from player_modules import PlayerModule - -def write_playlist(tracks: list, i: int): - lines = tracks[:i] + [f"> {tracks[i]}"] + tracks[i+1:] - with open("/tmp/radioPlayer_playlist", "w") as f: - for line in lines: - try: f.write(line + "\n") - except UnicodeEncodeError: - print(line.encode('utf-8', errors='ignore').decode('utf-8')) - raise +from typing import TYPE_CHECKING +if TYPE_CHECKING: + class PlayerModule: + def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]): + pass + def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool): + pass class Module(PlayerModule): def __init__(self) -> None: self.playlist = [] - def on_new_playlist(self, playlist: list[str]): - self.playlist = playlist - def on_new_track(self, track: str, index: int): - write_playlist(self.playlist, index) + def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]): + self.playlist = [t[0] for t in playlist] + def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool): + lines = self.playlist[:index] + [f"> {self.playlist[index]}"] + self.playlist[index+1:] + with open("/tmp/radioPlayer_playlist", "w") as f: + for line in lines: + try: f.write(line + "\n") + except UnicodeEncodeError: + print(line.encode('utf-8', errors='ignore').decode('utf-8')) + raise module = Module() \ No newline at end of file diff --git a/player_modules.py b/player_modules.py deleted file mode 100644 index c97bde0..0000000 --- a/player_modules.py +++ /dev/null @@ -1,5 +0,0 @@ -class PlayerModule: - def on_new_playlist(self, playlist: list[str]): - pass - def on_new_track(self, track: str, index: int): - pass \ No newline at end of file diff --git a/radioPlayer.py b/radioPlayer.py index dd166ae..05df6cc 100644 --- a/radioPlayer.py +++ b/radioPlayer.py @@ -3,18 +3,22 @@ DEBUG = False import time, datetime import os, subprocess, importlib.util import sys, signal, threading, glob -import re, unidecode +import unidecode import random -import socket from dataclasses import dataclass import log95, copy from pathlib import Path -from player_modules import PlayerModule + +class PlayerModule: + def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]): + pass + def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool): + pass simple_modules: list[PlayerModule] = [] SCRIPT_DIR = Path(__file__).resolve().parent -MODULES_DIR = SCRIPT_DIR / ".." / "modules" +MODULES_DIR = SCRIPT_DIR / "modules" MODULES_DIR = MODULES_DIR.resolve() def print_wait(ttw: float, frequency: float, duration: float=-1, prefix: str="", bias: float = 0): @@ -49,13 +53,6 @@ LATE_NIGHT_END = 5 JINGIEL_FILE = "/home/user/Jingiel.mp3" playlist_dir = "/home/user/playlists" -name_table_path = "/home/user/mixes/name_table.txt" - -rds_base = "Gramy: {} - {}" -rds_default_artist = "radio95" -rds_default_name = "Program Godzinny" - -udp_host = ("127.0.0.1", 5000) logger_level = log95.log95Levels.DEBUG if DEBUG else log95.log95Levels.CRITICAL_ERROR logger = log95.log95("radioPlayer", logger_level) @@ -143,61 +140,6 @@ def handle_sighup(signum, frame): signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGHUP, handle_sighup) # type: ignore -def load_dict_from_custom_format(file_path: str) -> dict[str, str]: - try: - result_dict = {} - with open(file_path, 'r') as file: - for line in file: - if line.strip() == "" or line.startswith(";"): continue - key, value = line.split(':', 1) - result_dict[key.strip()] = value.strip() - return result_dict - except FileNotFoundError: - logger.error(f"{name_table_path} does not exist, or could not be accesed") - return {} - -def process_for_rds(track_name: str): - name_table = load_dict_from_custom_format(name_table_path) - try: - name = name_table[track_name] - has_name = True - except KeyError: - has_name = False - name = track_name.rsplit(".", 1)[0] - - name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name) - - if " - " in name: - count = name.count(" - ") - while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal - name = name.split(" - ", 1)[1] - count = name.count(" - ") - artist = name.split(" - ", 1)[0] - title = name.split(" - ", 1)[1] - else: - artist = rds_default_artist - title = name - if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})") - - title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk - - prt = rds_base.format(artist, title) - rtp = [4] # type 1 - rtp.append(prt.find(artist)) # start 1 - rtp.append(len(artist)) # len 1 - rtp.append(1) # type 2 - rtp.append(prt.find(title)) # start 2 - rtp.append(len(title) - 1) # len 2 - return prt, ','.join(list(map(str, rtp))) - -def update_rds(prt: str, rtp: str): - try: - f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - f.settimeout(1.0) - f.sendto(f"TEXT={prt}\r\nRTP={rtp}\r\n".encode(), udp_host) - f.close() - except Exception as e: logger.error(f"Error updating RDS: {e}") - def load_filelines(path): try: with open(path, 'r') as f: return [unidecode.unidecode(line.strip()) for line in f.readlines() if unidecode.unidecode(line.strip())] @@ -303,8 +245,7 @@ def play_playlist(playlist_path, custom_playlist: bool=False): last_jingiel = False del last_jingiel - simple_playlist = [t[0] for t in playlist] - for module in simple_modules: module.on_new_playlist(simple_playlist) + for module in simple_modules: module.on_new_playlist(playlist) return_pending = False @@ -322,7 +263,7 @@ def play_playlist(playlist_path, custom_playlist: bool=False): procman.wait_all() return "reload" track_path = os.path.abspath(os.path.expanduser(track)) - for module in simple_modules: module.on_new_track(track_path, i) + for module in simple_modules: module.on_new_track(i, track_path, to_fade_in, to_fade_out, official) track_name = os.path.basename(track_path) current_modified_time = Time.get_playlist_modification_time(playlist_path) @@ -335,10 +276,6 @@ def play_playlist(playlist_path, custom_playlist: bool=False): if return_pending and not procman.anything_playing(): continue logger.info(f"Now playing: {track_name}") - if official: - rds_rt, rds_rtp = process_for_rds(track_name) - update_rds(rds_rt, rds_rtp) - logger.info(f"RT set to '{rds_rt}' (RTP: {rds_rtp})") if (i + 1) < len(playlist): logger.info(f"Next up: {os.path.basename(playlist[i+1][0])}") pr = procman.play(track_path, to_fade_in, to_fade_out)