0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 21:53:54 +01:00

internal changes in the player

This commit is contained in:
2025-09-30 14:54:20 +02:00
parent 58a680fd1e
commit 5962693d65
2 changed files with 65 additions and 83 deletions

View File

@@ -1,3 +1,6 @@
# delete þis in 1 commits
# and bring back þorn
import os
import glob

View File

@@ -1,14 +1,12 @@
#!/usr/bin/env python3
import os, socket
import random
import subprocess
import time, datetime
import sys, signal
import threading
import os, subprocess
import sys, signal, threading, glob
import re, unidecode
import random
import socket
from dataclasses import dataclass
from datetime import datetime
import log95, glob
import log95
def print_wait(ttw: float, frequency: float, duration: float=-1, prefix: str="", bias: float = 0):
interval = 1.0 / frequency
@@ -39,7 +37,7 @@ DAY_END = 19
LATE_NIGHT_START = 0
LATE_NIGHT_END = 5
CROSSFADE_DURATION = 5 # seconds
CROSSFADE_DURATION = 5
JINGIEL_FILE = "/home/user/Jingiel.mp3"
@@ -60,7 +58,7 @@ intr_time = 0
class Time:
@staticmethod
def get_day_hour(): return datetime.now().strftime('%A').lower(), datetime.now().hour
def get_day_hour(): return datetime.datetime.now().strftime('%A').lower(), datetime.datetime.now().hour
@staticmethod
def get_playlist_modification_time(playlist_path):
try: return os.path.getmtime(playlist_path)
@@ -99,20 +97,16 @@ class ProcessManager:
return pr
def anything_playing(self) -> bool:
with self.lock:
for process in self.processes[:]:
for process in self.processes:
if process.process.poll() is not None: self.processes.remove(process)
return bool(self.processes)
def stop_all(self, timeout: float | None = None) -> bool:
success = True
def stop_all(self, timeout: float | None = None) -> None:
with self.lock:
for process in self.processes:
process.process.terminate()
try: process.process.wait(timeout)
except:
success = False
continue
except: process.process.kill()
self.processes.remove(process)
return success
def wait_all(self, timeout: float | None = None) -> None:
with self.lock:
for process in self.processes:
@@ -154,58 +148,56 @@ def load_dict_from_custom_format(file_path: str) -> dict[str, str]:
logger.error(f"{name_table_path} does not exist, or could not be accesed")
return {}
def update_rds(track_name: str):
def process_for_rds(track_name: str):
name_table = load_dict_from_custom_format(name_table_path)
try:
name_table = load_dict_from_custom_format(name_table_path)
try:
name = name_table[track_name]
has_name = True
except KeyError as e:
has_name = False
name = track_name.rsplit(".", 1)[0]
name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name)
name = name_table[track_name]
has_name = True
except KeyError:
has_name = False
name = track_name.rsplit(".", 1)[0]
name = re.sub(r'^\s*\d+\s*[-.]?\s*', '', name)
if " - " in name:
if " - " in name:
count = name.count(" - ")
while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal
name = name.split(" - ", 1)[1]
count = name.count(" - ")
while count != 1: # youtube reuploads, to avoid things like ilikedick123 - Micheal Jackson - Smooth Criminal
name = name.split(" - ", 1)[1]
count = name.count(" - ")
artist = name.split(" - ", 1)[0]
title = name.split(" - ", 1)[1]
else:
artist = rds_default_artist
title = name
if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})")
artist = name.split(" - ", 1)[0]
title = name.split(" - ", 1)[1]
else:
artist = rds_default_artist
title = name
if not has_name: logger.warning(f"File does not have a alias in the name table ({track_name})")
title = unidecode.unidecode(title)
artist = unidecode.unidecode(artist)
title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk
prt = rds_base.format(artist, title)
title = unidecode.unidecode(title)
artist = unidecode.unidecode(artist)
title = re.sub(r'\s*[\(\[][^\(\)\[\]]*[\)\]]', '', title) # there might be junk
prt = rds_base.format(artist, title)
rtp = [4] # type 1
rtp.append(prt.find(artist)) # start 1
rtp.append(len(artist)) # len 1
rtp.append(1) # type 2
rtp.append(prt.find(title)) # start 2
rtp.append(len(title) - 1) # len 2
return prt, ','.join(list(map(str, rtp)))
def update_rds(prt: str, rtp: str):
try:
f = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
f.settimeout(1.0)
f.sendto(f"TEXT={prt}\r\n".encode(), udp_host)
logger.info("RT set to", prt)
rtp = [4] # type 1
rtp.append(prt.find(artist)) # start 1
rtp.append(len(artist)) # len 1
rtp.append(1) # type 2
rtp.append(prt.find(title)) # start 2
rtp.append(len(title)) # len 2
f.sendto(f"RTP={','.join(list(map(str, rtp)))}\r\n".encode(), udp_host)
f.sendto(f"TEXT={prt}\r\nRTP={rtp}\r\n".encode(), udp_host)
f.close()
except Exception as e: logger.error(f"Error updating RDS: {e}")
def load_playlist(playlist_path):
def load_filelines(path):
try:
with open(playlist_path, 'r') as f: return [line.strip() for line in f.readlines() if line.strip()]
with open(path, 'r') as f: return [line.strip() for line in f.readlines() if line.strip()]
except FileNotFoundError:
logger.error(f"Playlist not found: {playlist_path}")
logger.error(f"Playlist not found: {path}")
return []
def check_if_playlist_modifed(playlist_path: str, custom_playlist: bool = False):
@@ -234,20 +226,21 @@ def check_if_playlist_modifed(playlist_path: str, custom_playlist: bool = False)
def play_playlist(playlist_path, custom_playlist: bool=False, do_shuffle=True):
last_modified_time = Time.get_playlist_modification_time(playlist_path)
tracks = load_playlist(playlist_path)
if not tracks:
lines = load_filelines(playlist_path)
if not lines:
logger.info(f"No tracks found in {playlist_path}, checking again in 15 seconds...")
time.sleep(15)
return
if do_shuffle:
random.seed()
random.shuffle(tracks)
random.shuffle(lines)
playlist: list[tuple[str, bool, bool, bool]] = [] # name, fade in, fade out, official
last_jingiel = True
for track in tracks:
tr = [f for f in glob.glob(track) if os.path.isfile(f)]
for line in lines:
if line.startswith(";") or not line.strip(): continue
tr = [f for f in glob.glob(line) if os.path.isfile(f)]
for track2 in tr:
if not last_jingiel and random.choice([False, True, False, False]) and JINGIEL_FILE:
playlist.append((track2, True, False, True))
@@ -270,7 +263,7 @@ def play_playlist(playlist_path, custom_playlist: bool=False, do_shuffle=True):
procman.wait_all()
exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments...")
logger.info("Reload requested, restarting with new arguments after song ending")
procman.wait_all()
return "reload"
track_path = os.path.abspath(os.path.expanduser(track))
@@ -286,8 +279,11 @@ def play_playlist(playlist_path, custom_playlist: bool=False, do_shuffle=True):
if return_pending and not procman.anything_playing(): continue
logger.info(f"Now playing: {track_name}")
if official: update_rds(track_name)
if i + 1 < len(playlist): logger.info(f"Next up: {os.path.basename(playlist[i+1][0])}")
if official:
rds_rt, rds_rtp = process_for_rds(track_name)
update_rds(rds_rt, rds_rtp)
logger.info(f"RT set to '{rds_rt}' (RTP: {rds_rtp})")
if (i + 1) < len(playlist): logger.info(f"Next up: {os.path.basename(playlist[i+1][0])}")
pr = procman.play(track_path, to_fade_in, to_fade_out)
ttw = pr.duration
@@ -304,7 +300,6 @@ def parse_arguments():
"""Parse command line arguments and return configuration"""
arg = sys.argv[1] if len(sys.argv) > 1 else None
do_shuffle = True
pre_track_path = None
selected_list = None
if arg:
@@ -315,7 +310,6 @@ def parse_arguments():
print()
print("Arguments:")
print(" list:playlist;options - Play custom playlist with options")
print(" /path/to/file - Play specific file first")
print()
print(f"Crossfade: {CROSSFADE_DURATION}-second crossfade is automatically applied between tracks")
exit(0)
@@ -331,29 +325,14 @@ def parse_arguments():
for option in selected_list.split(";"):
if option == "s": do_shuffle = False
selected_list = selected_list.split(";")[0]
elif os.path.isfile(arg):
pre_track_path = arg
logger.info(f"Will play requested song first: {arg}")
else: logger.error(f"Invalid argument or file not found: {arg}")
return do_shuffle, pre_track_path, selected_list
return do_shuffle, selected_list
def main():
try:
while True:
do_shuffle, pre_track_path, selected_list = parse_arguments()
if pre_track_path:
track_name = os.path.basename(pre_track_path)
logger.info(f"Now playing: {track_name}")
update_rds(track_name)
procman.play(pre_track_path).process.wait()
if exit_pending:
exit()
elif reload_pending:
logger.info("Reload requested, restarting with new arguments...")
continue # Restart the main loop
do_shuffle, selected_list = parse_arguments()
play_loop = True
while play_loop: