0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 21:53:54 +01:00

modularize rds

This commit is contained in:
Kuba
2025-10-11 16:59:05 +02:00
parent bcef2ea2af
commit 07a5967919
4 changed files with 107 additions and 97 deletions

80
modules/rds.py Normal file
View File

@@ -0,0 +1,80 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
class PlayerModule:
def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]):
pass
def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool):
pass
import socket, re, log95, os
name_table_path = "/home/user/mixes/name_table.txt"
rds_base = "Gramy: {} - {}"
rds_default_artist = "radio95"
rds_default_name = "Program Godzinny"
udp_host = ("127.0.0.1", 5000)
logger = log95.log95("RDS-MODULE")
def load_dict_from_custom_format(file_path: str) -> dict[str, str]:
try:
result_dict = {}
with open(file_path, 'r') as file:
for line in file:
if line.strip() == "" or line.startswith(";"): continue
key, value = line.split(':', 1)
result_dict[key.strip()] = value.strip()
return result_dict
except FileNotFoundError:
logger.error(f"{name_table_path} does not exist, or could not be accesed")
return {}
def update_rds(track_name: str):
name_table = load_dict_from_custom_format(name_table_path)
try:
name = name_table[track_name]
has_name = True
except KeyError:
has_name = False
name = track_name.rsplit(".", 1)[0]
name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name)
if " - " in name:
count = name.count(" - ")
while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal
name = name.split(" - ", 1)[1]
count = name.count(" - ")
artist = name.split(" - ", 1)[0]
title = name.split(" - ", 1)[1]
else:
artist = rds_default_artist
title = name
if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})")
title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk
prt = rds_base.format(artist, title)
rtp = [4] # type 1
rtp.append(prt.find(artist)) # start 1
rtp.append(len(artist)) # len 1
rtp.append(1) # type 2
rtp.append(prt.find(title)) # start 2
rtp.append(len(title) - 1) # len 2
try:
f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
f.settimeout(1.0)
f.sendto(f"TEXT={prt}\r\nRTP={rtp}\r\n".encode(), udp_host)
f.close()
except Exception as e: logger.error(f"Error updating RDS: {e}")
return prt, ','.join(list(map(str, rtp)))
class Module(PlayerModule):
def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool):
if official:
rds_rt, rds_rtp = update_rds(os.path.basename(track))
logger.info(f"RT set to '{rds_rt}' (RTP: {rds_rtp})")

View File

@@ -1,25 +1,23 @@
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from player_modules import PlayerModule
def write_playlist(tracks: list, i: int):
lines = tracks[:i] + [f"> {tracks[i]}"] + tracks[i+1:]
with open("/tmp/radioPlayer_playlist", "w") as f:
for line in lines:
try: f.write(line + "\n")
except UnicodeEncodeError:
print(line.encode('utf-8', errors='ignore').decode('utf-8'))
raise
from typing import TYPE_CHECKING
if TYPE_CHECKING:
class PlayerModule:
def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]):
pass
def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool):
pass
class Module(PlayerModule):
def __init__(self) -> None:
self.playlist = []
def on_new_playlist(self, playlist: list[str]):
self.playlist = playlist
def on_new_track(self, track: str, index: int):
write_playlist(self.playlist, index)
def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]):
self.playlist = [t[0] for t in playlist]
def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool):
lines = self.playlist[:index] + [f"> {self.playlist[index]}"] + self.playlist[index+1:]
with open("/tmp/radioPlayer_playlist", "w") as f:
for line in lines:
try: f.write(line + "\n")
except UnicodeEncodeError:
print(line.encode('utf-8', errors='ignore').decode('utf-8'))
raise
module = Module()

View File

@@ -1,5 +0,0 @@
class PlayerModule:
def on_new_playlist(self, playlist: list[str]):
pass
def on_new_track(self, track: str, index: int):
pass

View File

@@ -3,18 +3,22 @@ DEBUG = False
import time, datetime
import os, subprocess, importlib.util
import sys, signal, threading, glob
import re, unidecode
import unidecode
import random
import socket
from dataclasses import dataclass
import log95, copy
from pathlib import Path
from player_modules import PlayerModule
class PlayerModule:
def on_new_playlist(self, playlist: list[tuple[str, bool, bool, bool]]):
pass
def on_new_track(self, index: int, track: str, to_fade_in: bool, to_fade_out: bool, official: bool):
pass
simple_modules: list[PlayerModule] = []
SCRIPT_DIR = Path(__file__).resolve().parent
MODULES_DIR = SCRIPT_DIR / ".." / "modules"
MODULES_DIR = SCRIPT_DIR / "modules"
MODULES_DIR = MODULES_DIR.resolve()
def print_wait(ttw: float, frequency: float, duration: float=-1, prefix: str="", bias: float = 0):
@@ -49,13 +53,6 @@ LATE_NIGHT_END = 5
JINGIEL_FILE = "/home/user/Jingiel.mp3"
playlist_dir = "/home/user/playlists"
name_table_path = "/home/user/mixes/name_table.txt"
rds_base = "Gramy: {} - {}"
rds_default_artist = "radio95"
rds_default_name = "Program Godzinny"
udp_host = ("127.0.0.1", 5000)
logger_level = log95.log95Levels.DEBUG if DEBUG else log95.log95Levels.CRITICAL_ERROR
logger = log95.log95("radioPlayer", logger_level)
@@ -143,61 +140,6 @@ def handle_sighup(signum, frame):
signal.signal(signal.SIGINT, handle_sigint)
signal.signal(signal.SIGHUP, handle_sighup) # type: ignore
def load_dict_from_custom_format(file_path: str) -> dict[str, str]:
try:
result_dict = {}
with open(file_path, 'r') as file:
for line in file:
if line.strip() == "" or line.startswith(";"): continue
key, value = line.split(':', 1)
result_dict[key.strip()] = value.strip()
return result_dict
except FileNotFoundError:
logger.error(f"{name_table_path} does not exist, or could not be accesed")
return {}
def process_for_rds(track_name: str):
name_table = load_dict_from_custom_format(name_table_path)
try:
name = name_table[track_name]
has_name = True
except KeyError:
has_name = False
name = track_name.rsplit(".", 1)[0]
name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name)
if " - " in name:
count = name.count(" - ")
while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal
name = name.split(" - ", 1)[1]
count = name.count(" - ")
artist = name.split(" - ", 1)[0]
title = name.split(" - ", 1)[1]
else:
artist = rds_default_artist
title = name
if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})")
title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk
prt = rds_base.format(artist, title)
rtp = [4] # type 1
rtp.append(prt.find(artist)) # start 1
rtp.append(len(artist)) # len 1
rtp.append(1) # type 2
rtp.append(prt.find(title)) # start 2
rtp.append(len(title) - 1) # len 2
return prt, ','.join(list(map(str, rtp)))
def update_rds(prt: str, rtp: str):
try:
f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
f.settimeout(1.0)
f.sendto(f"TEXT={prt}\r\nRTP={rtp}\r\n".encode(), udp_host)
f.close()
except Exception as e: logger.error(f"Error updating RDS: {e}")
def load_filelines(path):
try:
with open(path, 'r') as f: return [unidecode.unidecode(line.strip()) for line in f.readlines() if unidecode.unidecode(line.strip())]
@@ -303,8 +245,7 @@ def play_playlist(playlist_path, custom_playlist: bool=False):
last_jingiel = False
del last_jingiel
simple_playlist = [t[0] for t in playlist]
for module in simple_modules: module.on_new_playlist(simple_playlist)
for module in simple_modules: module.on_new_playlist(playlist)
return_pending = False
@@ -322,7 +263,7 @@ def play_playlist(playlist_path, custom_playlist: bool=False):
procman.wait_all()
return "reload"
track_path = os.path.abspath(os.path.expanduser(track))
for module in simple_modules: module.on_new_track(track_path, i)
for module in simple_modules: module.on_new_track(i, track_path, to_fade_in, to_fade_out, official)
track_name = os.path.basename(track_path)
current_modified_time = Time.get_playlist_modification_time(playlist_path)
@@ -335,10 +276,6 @@ def play_playlist(playlist_path, custom_playlist: bool=False):
if return_pending and not procman.anything_playing(): continue
logger.info(f"Now playing: {track_name}")
if official:
rds_rt, rds_rtp = process_for_rds(track_name)
update_rds(rds_rt, rds_rtp)
logger.info(f"RT set to '{rds_rt}' (RTP: {rds_rtp})")
if (i + 1) < len(playlist): logger.info(f"Next up: {os.path.basename(playlist[i+1][0])}")
pr = procman.play(track_path, to_fade_in, to_fade_out)