0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 13:52:00 +01:00

next track logic

This commit is contained in:
KubaPro010
2025-11-07 20:33:26 +01:00
parent cdae200009
commit 4d021c373c
6 changed files with 40 additions and 175 deletions

View File

@@ -74,7 +74,7 @@ class PlayerModule(BaseIMCModule):
def on_new_playlist(self, playlist: list[Track]) -> None:
"""This is called every new playlist"""
pass
def on_new_track(self, index: int, track: Track) -> None:
def on_new_track(self, index: int, track: Track, next_track: Track | None) -> None:
"""
Called on every track including the ones added by the active modifier, you can check for that comparing the playlists[index] and the track
"""
@@ -119,12 +119,13 @@ class ActiveModifier(BaseIMCModule):
Called at start up with the program arguments
"""
pass
def play(self, index:int, track: Track) -> tuple[Track, bool] | tuple[None, None]:
def play(self, index: int, track: Track, next_track: Track | None) -> tuple[tuple[Track, None] | tuple[Track, Track], bool] | tuple[tuple[None, None], None]:
"""
Returns a tuple, in the first case where a is the track and b is a bool, b corresponds to whether to extend the playlist, set to true when adding content instead of replacing it
When None, None is returned then that is treated as a skip, meaning the core will skip this song
The second track object is the next track, which is optional which is also only used for metadata and will not be taken in as data to play
"""
return track, False
return (track, None), False
def on_new_playlist(self, playlist: list[Track]) -> None:
"""
Same behaviour as the basic module function

View File

@@ -16,21 +16,27 @@ class Module(ActiveModifier):
if not self._imc: return
self.limit_tracks = bool(self._imc.send(self, "advisor", None))
def play(self, index: int, track: Track):
if not self.playlist: return track
def play(self, index: int, track: Track, next_track: Track | None):
if not self.playlist: return (track, next_track), False
if not os.path.exists("/tmp/radioPlayer_toplay"): open("/tmp/radioPlayer_toplay", "a").close()
with open("/tmp/radioPlayer_toplay", "r") as f: songs = [s.strip() for s in f.readlines() if s.strip()]
songs[:] = [f for s in songs for f in glob.glob(s) if os.path.isfile(f)] # expand glob
if len(songs):
song = songs.pop(0)
def get_song(pop: bool = True):
nonlocal songs
if pop: song = songs.pop(0)
else: song = songs[0]
official = True
if song.startswith("!"):
song = song[1:]
official = False # NOT FLOATINGPOINTERROR
song = Path(song).absolute()
return song, official
if len(songs):
song, official = get_song()
if self.last_track: last_track_to_fade_out = self.last_track.fade_out
else:
@@ -39,7 +45,8 @@ class Module(ActiveModifier):
if len(songs) != 0: next_track_to_fade_in = True
else:
if index + 1 < len(self.playlist): next_track_to_fade_in = self.playlist[index + 1].fade_in
if index + 1 < len(self.playlist) and next_track: next_track_to_fade_in = next_track.fade_in
elif not next_track: next_track_to_fade_in = False
else: next_track_to_fade_in = True
if not self.originals or self.originals[-1] != track: self.originals.append(track)
@@ -50,9 +57,18 @@ class Module(ActiveModifier):
logger.info(f"Playing {song.name} instead, as instructed by toplay")
self.last_track = Track(song, next_track_to_fade_in, last_track_to_fade_out, official, {})
return self.last_track, True
elif len(self.originals): self.last_track = self.originals.pop(0)
if len(songs):
# There are more tracks on the temp list
new_song, new_official = get_song(False)
self.last_track = Track(song, new_official, last_track_to_fade_out, official, {})
next_track = Track(new_song, new_official if len(songs) else next_track_to_fade_in, new_official, new_official, {})
else:
self.last_track = Track(song, next_track_to_fade_in, last_track_to_fade_out, official, {})
return (self.last_track, next_track), True
elif len(self.originals):
self.last_track = self.originals.pop(0)
next_track = self.originals[0]
else: self.last_track = track
if self.limit_tracks:
@@ -65,14 +81,14 @@ class Module(ActiveModifier):
future = datetime.datetime.fromtimestamp(now.timestamp() + last_track_duration)
if now.hour < MORNING_START and future.hour > MORNING_START:
logger.warning("Skipping track as it bleeds into the morning")
return None, None
return (None, None), None
elif now.hour < DAY_END and future.hour > DAY_END:
logger.warning("Skipping track as it bleeds into the night")
return None, None
return (None, None), None
elif future.day > now.day: # late night goes mid day, as it starts at midnight
logger.warning("Skipping track as it the next day")
return None, None
return (None, None), None
return self.last_track, False
return (self.last_track, next_track), False
activemod = Module()

View File

@@ -6,159 +6,6 @@ First of all, ther are in total only 4 modules:
- (!) Advisor (PlaylistAdvisor), this module is very important and is required to run and there can be only one of these in a core session. It is responsible for picking the playlist file itself as in a file path. This can be a scheduler, or just a constant
- Active modifier (ActiveModifier), this module is optional, but there can still be only one. This module can replace the track while playing, allowing you to skip tracks or play tracks on demand, it can also extend the playlist
```python
@dataclass
class Track:
path: str
fade_out: bool
fade_in: bool
official: bool
args: dict[str, str] | None
offset: float = 0.0
@dataclass
class Process:
process: Popen
track: str
started_at: float
duration: float
class Skeleton_ProcessManager:
processes: list[Process]
def _get_audio_duration(self, file_path): ...
def play(self, track_path: str, fade_in: bool=False, fade_out: bool=False, fade_time: int=5, offset: float=0.0) -> Process: ...
def anything_playing(self) -> bool: ...
def stop_all(self, timeout: float | None = None) -> None: ...
def wait_all(self, timeout: float | None = None) -> None: ...
class BaseIMCModule:
"""
This is not a module to be used but rather a placeholder IMC api to be used in other modules
"""
def imc(self, imc: 'InterModuleCommunication') -> None:
"""
Receive an IMC object
"""
self._imc = imc
def imc_data(self, source: 'BaseIMCModule', source_name: str | None, data: object, broadcast: bool) -> object:
"""
React to IMC data
"""
return None
class ProcmanCommunicator(BaseIMCModule):
def __init__(self, procman: Skeleton_ProcessManager) -> None:
self.procman = procman
def imc(self, imc: 'InterModuleCommunication') -> None:
super().imc(imc)
self._imc.register(self, "procman")
def imc_data(self, source: BaseIMCModule, source_name: str | None, data: object, broadcast: bool) -> object:
if broadcast: return
if isinstance(data, str) and data.lower().strip() == "raw": return self.procman
elif isinstance(data, dict):
op = data.get("op")
if not op: return
if int(op) == 0: return {"op": 0, "arg": "pong"}
elif int(op) == 1:
if arg := data.get("arg"):
return {"op": 1, "arg": self.procman._get_audio_duration(arg)}
else: return
elif int(op) == 2:
self.procman.stop_all(data.get("timeout", None))
return {"op": 2}
elif int(op) == 3:
return {"op": 3, "arg": self.procman.processes}
class PlayerModule(BaseIMCModule):
"""
Simple passive observer, this allows you to send the current track the your RDS encoder, or to your website
"""
def on_new_playlist(self, playlist: list[Track]) -> None:
"""This is called every new playlist"""
pass
def on_new_track(self, index: int, track: Track) -> None:
"""
Called on every track including the ones added by the active modifier, you can check for that comparing the playlists[index] and the track
"""
pass
def progress(self, index: int, track: Track, elapsed: float, total: float, real_total: float) -> None:
"""
Real total and total differ in that, total is how much the track lasts, but real_total will be for how long we will play it for
Runs at a frequency around 1 Hz
Please don't put any blocking or code that takes time
"""
pass
class PlaylistModifierModule:
"""
Playlist modifier, this type of module allows you to shuffle, or put jingles into your playlist
"""
def modify(self, global_args: dict, playlist: list[Track]) -> list[Track] | None:
"""
global_args are playlist global args (see radioPlayer_playlist_file.txt)
"""
return playlist
# No IMC, as we only run on new playlists
class PlaylistAdvisor(BaseIMCModule):
"""
Only one of a playlist advisor can be loaded. This module picks the playlist file to play, this can be a scheduler or just a static file
"""
def advise(self, arguments: str | None) -> str | None:
"""
Arguments are the arguments passed to the program on startup
"""
return "/path/to/playlist.txt"
def new_playlist(self) -> bool:
"""
Whether to play a new playlist, if this is True, then the player will refresh and fetch a new playlist, calling advise
"""
return False
class ActiveModifier(BaseIMCModule):
"""
This changes the next song to be played live, which means that this picks the next song, not the playlist, but this is affected by the playlist
"""
def arguments(self, arguments: str | None) -> None:
"""
Called at start up with the program arguments
"""
pass
def play(self, index:int, track: Track) -> tuple[Track, bool] | tuple[None, None]:
"""
Returns a tuple, in the first case where a is the track and b is a bool, b corresponds to whether to extend the playlist, set to true when adding content instead of replacing it
When None, None is returned then that is treated as a skip, meaning the core will skip this song
"""
return track, False
def on_new_playlist(self, playlist: list[Track]) -> None:
"""
Same behaviour as the basic module function
"""
pass
class InterModuleCommunication:
def __init__(self, modules: Sequence[BaseIMCModule | None]) -> None:
self.modules = modules
self.names_modules: dict[str, BaseIMCModule] = {}
for module in modules:
if module: module.imc(self)
def broadcast(self, source: BaseIMCModule, data: object) -> None:
"""
Send data to all modules, other than ourself
"""
source_name = next((k for k, v in self.names_modules.items() if v is source), None)
for module in [f for f in self.modules if f is not source]:
if module: module.imc_data(source, source_name, data, True)
def register(self, module: BaseIMCModule, name: str) -> bool:
"""
Register our module with a name, so we can be sent data via the send function
"""
if name in self.names_modules.keys(): return False
self.names_modules[name] = module
return True
def send(self, source: BaseIMCModule, name: str, data: object) -> object:
"""
Sends the data to a named module, and return its response
"""
if not name in self.names_modules.keys(): raise Exception
return self.names_modules[name].imc_data(source, next((k for k, v in self.names_modules.items() if v is source), None), data, False)
```
Each module shall have a python script in the modules directory. Each of the modules need to define one or more global variables in order to be seen by the core:
- module (list['PlayerModule'] or 'PlayerModule'), this shall be just the list or one passive observer class
- playlistmod ('PlaylistModifierModule', list['PlaylistModifierModule'], tuple['PlaylistModifierModule' | list['PlaylistModifierModule'], int]), module itself, list of modules or the module itself and list of them with an index integer which sets the order of modifiers (0 is first)

View File

@@ -71,7 +71,7 @@ def update_rds(track_name: str):
return prt, ','.join(list(map(str, rtp)))
class Module(PlayerModule):
def on_new_track(self, index: int, track: Track):
def on_new_track(self, index: int, track: Track, next_track: Track | None):
if track.official:
rds_rt, rds_rtp = update_rds(track.path.name)
logger.info(f"RT set to '{rds_rt}'")

View File

@@ -7,15 +7,14 @@ class Module(PlayerModule):
self.playlist = []
def on_new_playlist(self, playlist: list[Track]):
self.playlist = [str(t.path.absolute()) for t in playlist]
def on_new_track(self, index: int, track: Track):
def on_new_track(self, index: int, track: Track, next_track: Track | None):
if next_track: logger.info("Next up:", next_track.path.name)
if str(track.path) != self.playlist[index]:
# discrepancy, which means that the playing file was modified by the active modifier
# we are playing a file that was not determined in the playlist, that means it was chosen by the active modifier and made up on the fly
lines = self.playlist[:index] + [f"> ({track.path})"] + [self.playlist[index]] + self.playlist[index+1:]
logger.info("Next up:", Path(self.playlist[index]).name) # core no longer does this
else:
lines = self.playlist[:index] + [f"> {self.playlist[index]}"] + self.playlist[index+1:]
if index + 1 < len(self.playlist): logger.info("Next up:", Path(self.playlist[index+1]).name)
with open("/tmp/radioPlayer_playlist", "w") as f:
for line in lines:
try: f.write(line + "\n")

View File

@@ -182,8 +182,9 @@ def play_playlist(playlist_path: Path, starting_index: int = 0):
continue
track = playlist[song_i % len(playlist)]
next_track = playlist[song_i + 1] if song_i + 1 < len(playlist) else None
if active_modifier:
track, extend = active_modifier.play(song_i, track)
(track, next_track), extend = active_modifier.play(song_i, track, next_track)
if track is None:
song_i += 1
continue
@@ -192,7 +193,7 @@ def play_playlist(playlist_path: Path, starting_index: int = 0):
logger.info(f"Now playing: {track.path.name}")
for module in simple_modules: module.on_new_track(song_i, track)
for module in simple_modules: module.on_new_track(song_i, track, next_track)
pr = procman.play(track, cross_fade)
@@ -210,9 +211,10 @@ def play_playlist(playlist_path: Path, starting_index: int = 0):
remaining_until_end = end_time - time.monotonic()
if elapsed < 1 and remaining_until_end > 0: time.sleep(min(1 - elapsed, remaining_until_end))
if next_track: prefetch(next_track.path)
i += 1
if not extend: song_i += 1
prefetch(playlist[song_i % len(playlist)].path)
def main():
logger.info("Core is starting, loading modules")