From 82e8423178d83f5bb9043aaaea3f44071667a8d7 Mon Sep 17 00:00:00 2001 From: KubaPro010 Date: Sat, 27 Dec 2025 21:45:18 +0100 Subject: [PATCH] some changes --- modules/__init__.py | 8 +- modules/ffmpeg_procman.py | 72 ++ radioPlayer.py | 236 ++--- requirements.txt | 2 +- tinytag.py | 2052 +++++++++++++++++++++++++++++++++++++ 5 files changed, 2248 insertions(+), 122 deletions(-) create mode 100644 modules/ffmpeg_procman.py create mode 100644 tinytag.py diff --git a/modules/__init__.py b/modules/__init__.py index 2368c4a..cd07c0b 100644 --- a/modules/__init__.py +++ b/modules/__init__.py @@ -3,6 +3,7 @@ from collections.abc import Sequence from subprocess import Popen from dataclasses import dataclass from pathlib import Path +import tinytag @dataclass class Track: @@ -22,9 +23,6 @@ class Process: duration: float class ABC_ProcessManager(abc.ABC): - processes: list[Process] - @abc.abstractmethod - def _get_audio_duration(self, file_path): ... @abc.abstractmethod def play(self, track: Track) -> Process: ... @abc.abstractmethod @@ -61,13 +59,13 @@ class ProcmanCommunicator(BaseIMCModule): if int(op) == 0: return {"op": 0, "arg": "pong"} elif int(op) == 1: - if arg := data.get("arg"): return {"op": 1, "arg": self.procman._get_audio_duration(arg)} + if arg := data.get("arg"): return {"op": 1, "arg": tinytag.TinyTag().get(arg, tags=False).duration} else: return elif int(op) == 2: self.procman.stop_all(data.get("timeout", None)) return {"op": 2} elif int(op) == 3: - return {"op": 3, "arg": self.procman.processes} + raise NotImplementedError("This feature was removed.") elif int(op) == 4: return {"op": 4, "arg": self.procman.anything_playing()} elif int(op) == 5: diff --git a/modules/ffmpeg_procman.py b/modules/ffmpeg_procman.py new file mode 100644 index 0000000..f97b2e1 --- /dev/null +++ b/modules/ffmpeg_procman.py @@ -0,0 +1,72 @@ +from . import ABC_ProcessManager, Process, Track, Path, Popen, tinytag +from threading import Lock +import subprocess, time + +class ProcessManager(ABC_ProcessManager): + def __init__(self) -> None: + self.lock = Lock() + self.processes: list[Process] = [] + def _get_audio_duration(self, file_path: Path): + return tinytag.TinyTag().get(file_path, tags=False).duration + def play(self, track: Track) -> Process: + assert track.path.exists() + cmd = ['ffplay', '-nodisp', '-hide_banner', '-autoexit', '-loglevel', 'quiet'] + + duration = self._get_audio_duration(track.path.absolute()) + if not duration: raise Exception("Failed to get file duration for", track.path) + if track.offset >= duration: track.offset = max(duration - 0.1, 0) + if track.offset > 0: cmd.extend(['-ss', str(track.offset)]) + + filters = [] + if track.fade_in != 0: filters.append(f"afade=t=in:st=0:d={track.fade_in}") + if track.fade_out != 0: filters.append(f"afade=t=out:st={duration - track.fade_out}:d={track.fade_out}") + if filters: cmd.extend(['-af', ",".join(filters)]) + cmd.append(str(track.path.absolute())) + + pr = Process(Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True), track, time.monotonic(), duration - track.offset) + with self.lock: self.processes.append(pr) + return pr + def anything_playing(self) -> bool: + with self.lock: + self.processes = [p for p in self.processes if p.process.poll() is None] + return bool(self.processes) + def stop_all(self, timeout: float | None = None) -> None: + with self.lock: + for process in self.processes: + process.process.terminate() + try: process.process.wait(timeout) + except subprocess.TimeoutExpired: process.process.kill() + self.processes.clear() + def wait_all(self, timeout: float | None = None) -> None: + with self.lock: + for process in self.processes: + try: process.process.wait(timeout) + except subprocess.TimeoutExpired: process.process.terminate() + self.processes.clear() + +procman = ProcessManager() + +# This is free and unencumbered software released into the public domain. + +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. + +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +# For more information, please refer to diff --git a/radioPlayer.py b/radioPlayer.py index 3d66e82..7c41b04 100644 --- a/radioPlayer.py +++ b/radioPlayer.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -import os, subprocess, importlib.util, importlib.machinery, types -import sys, signal, glob, time, traceback -import libcache +import os, importlib.util, importlib.machinery, types +import sys, signal, glob, time, traceback, io +import concurrent.futures from modules import * from threading import Lock @@ -16,54 +16,6 @@ def prefetch(path): MODULES_PACKAGE = "modules" MODULES_DIR = Path(__file__, "..", MODULES_PACKAGE).resolve() -class ProcessManager(ABC_ProcessManager): - def __init__(self) -> None: - self.lock = Lock() - self.processes: list[Process] = [] - self.duration_cache = libcache.Cache([]) - def _get_audio_duration(self, file_path: Path): - if result := self.duration_cache.getElement(file_path.as_posix(), False): return result - result = subprocess.run(['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(file_path)], capture_output=True, text=True) - if result.returncode == 0: - result = float(result.stdout.strip()) - self.duration_cache.saveElement(file_path.as_posix(), result, (60*60*2), False, True) - return result - def play(self, track: Track) -> Process: - assert track.path.exists() - cmd = ['ffplay', '-nodisp', '-hide_banner', '-autoexit', '-loglevel', 'quiet'] - - duration = self._get_audio_duration(track.path.absolute()) - if not duration: raise Exception("Failed to get file duration for", track.path) - if track.offset >= duration: track.offset = max(duration - 0.1, 0) - if track.offset > 0: cmd.extend(['-ss', str(track.offset)]) - - filters = [] - if track.fade_in != 0: filters.append(f"afade=t=in:st=0:d={track.fade_in}") - if track.fade_out != 0: filters.append(f"afade=t=out:st={duration - track.fade_out}:d={track.fade_out}") - if filters: cmd.extend(['-af', ",".join(filters)]) - cmd.append(str(track.path.absolute())) - - pr = Process(Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True), track, time.monotonic(), duration - track.offset) - with self.lock: self.processes.append(pr) - return pr - def anything_playing(self) -> bool: - with self.lock: - self.processes = [p for p in self.processes if p.process.poll() is None] - return bool(self.processes) - def stop_all(self, timeout: float | None = None) -> None: - with self.lock: - for process in self.processes: - process.process.terminate() - try: process.process.wait(timeout) - except subprocess.TimeoutExpired: process.process.kill() - self.processes.clear() - def wait_all(self, timeout: float | None = None) -> None: - with self.lock: - for process in self.processes: - try: process.process.wait(timeout) - except subprocess.TimeoutExpired: process.process.terminate() - self.processes.clear() - class PlaylistParser: def __init__(self, output: log95.TextIO): self.logger = log95.log95("PARSER", output=output) @@ -99,19 +51,25 @@ class PlaylistParser: if line.startswith("|"): # No file name, we're defining global arguments args = line.removeprefix("|").split(";") for arg in args: - key, val = arg.split("=", 1) - global_arguments[key] = val + if "=" in arg: + key, val = arg.split("=", 1) + arguments[key] = val + else: + arguments[arg] = True else: line, args = line.split("|", 1) args = args.split(";") for arg in args: - key, val = arg.split("=", 1) - arguments[key] = val + if "=" in arg: + key, val = arg.split("=", 1) + arguments[key] = val + else: + arguments[arg] = True out.append(([f for f in glob.glob(line) if Path(f).is_file()], arguments)) return global_arguments, out class ModuleManager: - def __init__(self, output: log95.TextIO) -> types.NoneType: + def __init__(self, output: log95.TextIO) -> None: self.simple_modules: list[PlayerModule] = [] self.playlist_modifier_modules: list[PlaylistModifierModule] = [] self.playlist_advisor: PlaylistAdvisor | None = None @@ -146,51 +104,61 @@ class ModuleManager: module.__dict__['_log_out'] = self.logger.output self.modules.append((spec, module, module_name)) def start_modules(self, arg): + procman = None """Executes the module by the python interpreter""" - procman = ProcessManager() - for (spec, module, module_name) in self.modules: + def timed_loader(spec: importlib.machinery.ModuleSpec, module: types.ModuleType): assert spec.loader - try: - start = time.perf_counter() - if os.name == "posix": - def handler(signum, frame): raise TimeoutError("Module loading timed out") - signal.signal(signal.SIGALRM, handler) - signal.alarm(5) - try: spec.loader.exec_module(module) - except TimeoutError: self.logger.error(f"Module {module_name} took too long to load and was skipped.") - finally: - if os.name == "posix": signal.alarm(0) - if (time_took := time.perf_counter() - start) > 0.15: self.logger.warning(f"{module_name} took {time_took:.1f}s to start") - except Exception as e: - traceback.print_exc(file=self.logger.output) - self.logger.error(f"Failed loading {module_name} due to {e}, continuing") - continue - - if md := getattr(module, "module", None): - if isinstance(md, list): self.simple_modules.extend(md) - else: self.simple_modules.append(md) - if md := getattr(module, "playlistmod", None): - if isinstance(md, tuple): - md, index = md - if isinstance(md, list): self.playlist_modifier_modules[index:index] = md - else: self.playlist_modifier_modules.insert(index, md) - elif isinstance(md, list): self.playlist_modifier_modules.extend(md) - else: self.playlist_modifier_modules.append(md) - if md := getattr(module, "advisor", None): - if self.playlist_advisor: raise Exception("Multiple playlist advisors") - self.playlist_advisor = md - if md := getattr(module, "activemod", None): - if self.active_modifier: raise Exception("Multiple active modifiers") - self.active_modifier = md - if md := getattr(module, "procman", None): - if not isinstance(md, ABC_ProcessManager): - self.logger.error("Modular process manager does not inherit from ABC_ProcessManager.") + start = time.perf_counter() + spec.loader.exec_module(module) + duration = time.perf_counter() - start + return duration + with concurrent.futures.ThreadPoolExecutor() as executor: + for (spec, module, module_name) in self.modules: + try: + future = executor.submit(timed_loader, spec, module) + try: + time_took = future.result(5) + if time_took > 0.15: self.logger.warning(f"{module_name} took {time_took:.1f}s to start") + except concurrent.futures.TimeoutError: + self.logger.error(f"Module {module_name} timed out.") + continue + except Exception as e: + traceback.print_exc(file=self.logger.output) + self.logger.error(f"Failed loading {module_name} due to {e}, continuing") continue - if procman.anything_playing(): procman.stop_all() - procman = md - InterModuleCommunication(self.simple_modules + [self.playlist_advisor, ProcmanCommunicator(procman), self.active_modifier]) + + if md := getattr(module, "module", None): + if isinstance(md, list): self.simple_modules.extend(md) + else: self.simple_modules.append(md) + if md := getattr(module, "playlistmod", None): + if isinstance(md, tuple): + md, index = md + if isinstance(md, list): self.playlist_modifier_modules[index:index] = md + else: self.playlist_modifier_modules.insert(index, md) + elif isinstance(md, list): self.playlist_modifier_modules.extend(md) + else: self.playlist_modifier_modules.append(md) + if md := getattr(module, "advisor", None): + if self.playlist_advisor: raise Exception("Multiple playlist advisors") + self.playlist_advisor = md + if md := getattr(module, "activemod", None): + if self.active_modifier: raise Exception("Multiple active modifiers") + self.active_modifier = md + if md := getattr(module, "procman", None): + if procman: raise Exception("Multiple procmans") + if not isinstance(md, ABC_ProcessManager): + self.logger.error("Modular process manager does not inherit from ABC_ProcessManager.") + continue + procman = md if self.active_modifier: self.active_modifier.arguments(arg) + if not self.playlist_advisor: self.logger.warning("Playlist advisor was not found. Beta mode of advisor-less is running (playlist modifiers will not work)") + if not procman: + self.logger.critical_error("Missing process mananger.") + raise SystemExit("Missing process mananger.") + InterModuleCommunication(self.simple_modules + [self.playlist_advisor, ProcmanCommunicator(procman), self.active_modifier]) return procman + def advisor_advise(self, arguments: str | None): + if not self.playlist_advisor: return None + return self.playlist_advisor.advise(arguments) class RadioPlayer: def __init__(self, arg: str | None, output: log95.TextIO): @@ -224,12 +192,13 @@ class RadioPlayer: self.logger.info("Core starting, loading modules") self.modman.load_modules() self.procman = self.modman.start_modules(self.arg) - if not self.modman.playlist_advisor: self.logger.warning("Playlist advisor was not found. Beta mode of advisor-less is running (playlist modifiers will not work)") def play_once(self): """Plays a single playlist""" - if self.modman.playlist_advisor: - if not (playlist_path := self.modman.playlist_advisor.advise(self.arg)): return + if not (playlist_path := self.modman.advisor_advise(self.arg)): + max_iterator = 1 + playlist = None + else: try: global_args, parsed = self.parser.parse(playlist_path) except Exception as e: self.logger.info(f"Exception ({e}) while parsing playlist, retrying in 15 seconds...");traceback.print_exc(file=self.logger.output) @@ -237,22 +206,23 @@ class RadioPlayer: return playlist: list[Track] | None = [] - [playlist.extend(Track(Path(line).absolute(), 0, 0, True, args) for line in lns) for (lns, args) in parsed] # i can read this, i think + for lines, args in parsed: + for line in lines: + playlist.append(Track(Path(line).absolute(), 0, 0, True, args)) - [(playlist := module.modify(global_args, playlist) or playlist) for module in self.modman.playlist_modifier_modules if module] # yep + for module in filter(None, self.modman.playlist_modifier_modules): playlist = module.modify(global_args, playlist) or playlist assert len(playlist) prefetch(playlist[0].path) - [mod.on_new_playlist(playlist, global_args) for mod in self.modman.simple_modules + [self.modman.active_modifier] if mod] # one liner'd everything + for module in filter(None, self.modman.simple_modules + [self.modman.active_modifier]): module.on_new_playlist(playlist, global_args) max_iterator = len(playlist) - else: - max_iterator = 1 - playlist = None + return self._play(playlist, max_iterator) + + def _play(self, playlist: list[Track] | None, max_iterator: int): + assert self.procman return_pending = track = False song_i = i = 0 - assert self.procman - def get_track(): nonlocal song_i, playlist, max_iterator track = None @@ -303,6 +273,7 @@ class RadioPlayer: [module.progress(song_i, track, time.monotonic() - pr.started_at, pr.duration, end_time - pr.started_at) for module in self.modman.simple_modules if module] if (elapsed := time.monotonic() - start) < 1 and (remaining_until_end := end_time - time.monotonic()) > 0: time.sleep(min(1 - elapsed, remaining_until_end)) + prefetch(next_track) i += 1 if not extend: song_i += 1 @@ -320,16 +291,49 @@ class RadioPlayer: traceback.print_exc(file=self.logger.output) raise +class RotatingLog(io.TextIOWrapper): + def write(self, s: str) -> int: + if self.tell() > 2_500_000: + self.truncate(0) + self.seek(0) + return super().write(s) + def main(): log_file_path = Path("/tmp/radioPlayer_log") log_file_path.touch() - core = RadioPlayer((" ".join(sys.argv[1:]) if len(sys.argv) > 1 else None), open(log_file_path, "w")) - try: - core.start() - signal.signal(signal.SIGINT, core.handle_sigint) - core.loop() - except SystemExit: - try: core.shutdown() - except BaseException: traceback.print_exc() - raise \ No newline at end of file + with RotatingLog(open(log_file_path, "wb", buffering=0), "utf-8") as f: + core = RadioPlayer((" ".join(sys.argv[1:]) if len(sys.argv) > 1 else None), f) + try: + core.start() + signal.signal(signal.SIGINT, core.handle_sigint) + core.loop() + except SystemExit: + try: core.shutdown() + except BaseException: traceback.print_exc() + raise + +# This is free and unencumbered software released into the public domain. + +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. + +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +# For more information, please refer to diff --git a/requirements.txt b/requirements.txt index b29a288..a5a0658 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ libcache -log95==1.3 +log95 unidecode \ No newline at end of file diff --git a/tinytag.py b/tinytag.py new file mode 100644 index 0000000..257085f --- /dev/null +++ b/tinytag.py @@ -0,0 +1,2052 @@ +# SPDX-FileCopyrightText: 2014-2025 tinytag Contributors +# SPDX-License-Identifier: MIT + +# tinytag - an audio file metadata reader +# http://github.com/tinytag/tinytag + +# MIT License + +# Copyright (c) 2014-2025 Tom Wallroth, Mat (mathiascode), et al. + +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +"""Audio file metadata reader.""" + +from __future__ import annotations +from binascii import a2b_base64 +from io import BytesIO +from os import PathLike, SEEK_CUR, SEEK_END, environ, fsdecode +from struct import unpack + +TYPE_CHECKING = False + +# Lazy imports for type checking +if TYPE_CHECKING: + from collections.abc import Callable, Iterator # pylint: disable-all + from typing import Any, BinaryIO, Dict, List, Union + + _StringListDict = Dict[str, List[str]] + _ImageListDict = Dict[str, List["Image"]] + _DataTreeDict = Dict[ + bytes, Union['_DataTreeDict', Callable[..., Dict[str, Any]]]] +else: + _StringListDict = _ImageListDict = _DataTreeDict = dict + +# some of the parsers can print debug info +_DEBUG = bool(environ.get('TINYTAG_DEBUG')) + + +class TinyTagException(Exception): + """Base class for exceptions.""" + + +class ParseError(TinyTagException): + """Parsing an audio file failed.""" + + +class UnsupportedFormatError(TinyTagException): + """File format is not supported.""" + + +class TinyTag: + """A class containing audio file properties and metadata fields.""" + + SUPPORTED_FILE_EXTENSIONS = ( + '.mp1', '.mp2', '.mp3', + '.oga', '.ogg', '.opus', '.spx', + '.wav', '.flac', '.wma', + '.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc', + '.aiff', '.aifc', '.aif', '.afc' + ) + _OTHER_PREFIX = 'other.' + _file_extension_mapping: dict[tuple[str, ...], type[TinyTag]] | None = None + + def __init__(self) -> None: + self.filename: str | None = None + self.filesize = 0 + + self.duration: float | None = None + self.channels: int | None = None + self.bitrate: float | None = None + self.bitdepth: int | None = None + self.samplerate: int | None = None + + self.artist: str | None = None + self.albumartist: str | None = None + self.composer: str | None = None + self.album: str | None = None + self.disc: int | None = None + self.disc_total: int | None = None + self.title: str | None = None + self.track: int | None = None + self.track_total: int | None = None + self.genre: str | None = None + self.year: str | None = None + self.comment: str | None = None + + self.images = Images() + self.other: _StringListDict = OtherFields() + + self._filehandler: BinaryIO | None = None + self._default_encoding: str | None = None # override for some formats + self._parse_duration = True + self._parse_tags = True + self._load_image = False + self._tags_parsed = False + self.__dict__: dict[str, str | float | Images | OtherFields | None] + + @classmethod + def get(cls, + filename: bytes | str | PathLike[Any] | None = None, + file_obj: BinaryIO | None = None, + tags: bool = True, + duration: bool = True, + image: bool = False, + encoding: str | None = None, + ignore_errors: bool | None = None) -> TinyTag: + """Return a tag object for an audio file.""" + should_close_file = file_obj is None + filename_str = None + if filename: + if should_close_file: + # pylint: disable=consider-using-with + file_obj = open(filename, 'rb') + filename_str = fsdecode(filename) + if file_obj is None: + raise ValueError( + 'Either filename or file_obj argument is required') + if ignore_errors is not None: + # pylint: disable=import-outside-toplevel + from warnings import warn + warn('ignore_errors argument is obsolete, and will be removed in ' + 'the future', DeprecationWarning, stacklevel=2) + try: + # pylint: disable=protected-access + file_obj.seek(0, SEEK_END) + filesize = file_obj.tell() + file_obj.seek(0) + parser_class = cls._get_parser_class(filename_str, file_obj) + tag = parser_class() + tag._filehandler = file_obj + tag._default_encoding = encoding + tag.filename = filename_str + tag.filesize = filesize + if filesize > 0: + try: + tag._load(tags=tags, duration=duration, image=image) + except Exception as exc: + raise ParseError(exc) from exc + return tag + finally: + if should_close_file: + file_obj.close() + + @classmethod + def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool: + """Check if a specific file is supported based on its file + extension.""" + filename_str = fsdecode(filename) + return cls._get_parser_for_filename(filename_str) is not None + + def as_dict(self) -> dict[str, str | float | list[str]]: + """Return a flat dictionary representation of available + metadata.""" + fields: dict[str, str | float | list[str]] = {} + for key, value in self.__dict__.items(): + if key.startswith('_'): + continue + if isinstance(value, Images): + continue + if not isinstance(value, OtherFields): + if value is None: + continue + if key != 'filename' and isinstance(value, str): + fields[key] = [value] + else: + fields[key] = value + continue + for other_key, other_values in value.items(): + other_fields = fields.get(other_key) + if not isinstance(other_fields, list): + other_fields = fields[other_key] = [] + other_fields += other_values + return fields + + @classmethod + def _get_parser_for_filename(cls, filename: str) -> type[TinyTag] | None: + if cls._file_extension_mapping is None: + cls._file_extension_mapping = { + ('.mp1', '.mp2', '.mp3'): _ID3, + ('.oga', '.ogg', '.opus', '.spx'): _Ogg, + ('.wav',): _Wave, + ('.flac',): _Flac, + ('.wma',): _Wma, + ('.m4b', '.m4a', '.m4r', '.m4v', '.mp4', + '.aax', '.aaxc'): _MP4, + ('.aiff', '.aifc', '.aif', '.afc'): _Aiff, + } + filename = filename.lower() + for ext, tagclass in cls._file_extension_mapping.items(): + if filename.endswith(ext): + return tagclass + return None + + @classmethod + def _get_parser_for_file_handle( + cls, + filehandle: BinaryIO + ) -> type[TinyTag] | None: + # https://en.wikipedia.org/wiki/List_of_file_signatures + header = filehandle.read(35) + filehandle.seek(0) + if header.startswith(b'ID3') or header.startswith(b'\xff\xfb'): + return _ID3 + if header.startswith(b'fLaC'): + return _Flac + if ((header[4:8] == b'ftyp' + and header[8:11] in {b'M4A', b'M4B', b'aax'}) + or b'\xff\xf1' in header): + return _MP4 + if (header.startswith(b'OggS') + and (header[29:33] == b'FLAC' or header[29:35] == b'vorbis' + or header[28:32] == b'Opus' or header[29:34] == b'Speex')): + return _Ogg + if header.startswith(b'RIFF') and header[8:12] == b'WAVE': + return _Wave + if header.startswith(b'\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00' + b'\xAA\x00\x62\xCE\x6C'): + return _Wma + if header.startswith(b'FORM') and header[8:12] in {b'AIFF', b'AIFC'}: + return _Aiff + return None + + @classmethod + def _get_parser_class( + cls, + filename: str | None = None, + filehandle: BinaryIO | None = None + ) -> type[TinyTag]: + if cls != TinyTag: + return cls + if filename: + parser_class = cls._get_parser_for_filename(filename) + if parser_class is not None: + return parser_class + # try determining the file type by magic byte header + if filehandle: + parser_class = cls._get_parser_for_file_handle(filehandle) + if parser_class is not None: + return parser_class + raise UnsupportedFormatError( + 'No tag reader found to support file type') + + def _load(self, tags: bool, duration: bool, image: bool = False) -> None: + self._parse_tags = tags + self._parse_duration = duration + self._load_image = image + if self._filehandler is None: + raise ValueError("File handle is required") + if tags: + self._parse_tag(self._filehandler) + if duration: + if tags: # rewind file if the tags were already parsed + self._filehandler.seek(0) + self._determine_duration(self._filehandler) + + def _set_field(self, fieldname: str, value: str | float, + check_conflict: bool = True) -> None: + if fieldname.startswith(self._OTHER_PREFIX): + fieldname = fieldname[len(self._OTHER_PREFIX):] + if check_conflict and fieldname in self.__dict__: + fieldname = '_' + fieldname + if fieldname not in self.other: + self.other[fieldname] = [] + self.other[fieldname].append(str(value)) + if _DEBUG: + print(f'Adding value "{value} to field "{fieldname}"') + return + old_value = self.__dict__.get(fieldname) + new_value = value + if isinstance(new_value, str): + # First value goes in tag, others in tag.other + values = new_value.split('\x00') + for index, i_value in enumerate(values): + if index or old_value and i_value != old_value: + self._set_field( + self._OTHER_PREFIX + fieldname, i_value, + check_conflict=False) + continue + new_value = i_value + if old_value: + return + elif not new_value and old_value: + # Prioritize non-zero integer values + return + if _DEBUG: + print(f'Setting field "{fieldname}" to "{new_value!r}"') + self.__dict__[fieldname] = new_value + + def _determine_duration(self, fh: BinaryIO) -> None: + raise NotImplementedError + + def _parse_tag(self, fh: BinaryIO) -> None: + raise NotImplementedError + + def _update(self, other: TinyTag) -> None: + # update the values of this tag with the values from another tag + for key, value in other.__dict__.items(): + if key.startswith('_'): + continue + if isinstance(value, OtherFields): + for other_key, other_values in other.other.items(): + for other_value in other_values: + self._set_field( + self._OTHER_PREFIX + other_key, other_value, + check_conflict=False) + elif isinstance(value, Images): + self.images._update(value) # pylint: disable=protected-access + elif value is not None: + self._set_field(key, value) + + @staticmethod + def _unpad(s: str) -> str: + # certain strings *may* be terminated with a zero byte at the end + return s.strip('\x00') + + def get_image(self) -> bytes | None: + """Deprecated, use 'images.any' instead.""" + from warnings import warn # pylint: disable=import-outside-toplevel + warn('get_image() is deprecated, and will be removed in the future. ' + "Use 'images.any' instead.", + DeprecationWarning, stacklevel=2) + image = self.images.any + return image.data if image is not None else None + + @property + def audio_offset(self) -> None: # pylint: disable=useless-return + """Obsolete.""" + from warnings import warn # pylint: disable=import-outside-toplevel + warn("'audio_offset' attribute is obsolete, and will be " + 'removed in the future', + DeprecationWarning, stacklevel=2) + return None + + @property + def extra(self) -> dict[str, str]: + """Deprecated, use 'other' instead.""" + from warnings import warn # pylint: disable=import-outside-toplevel + warn("'extra' attribute is deprecated, and will be " + "removed in the future. Use 'other' instead.", + DeprecationWarning, stacklevel=2) + extra_keys = {'copyright', 'initial_key', 'isrc', 'lyrics', 'url'} + return {k: v[0] for k, v in self.other.items() if k in extra_keys} + + +class Images: + """A class containing images embedded in an audio file.""" + _OTHER_PREFIX = 'other.' + + def __init__(self) -> None: + self.front_cover: Image | None = None + self.back_cover: Image | None = None + self.media: Image | None = None + + self.other: _ImageListDict = OtherImages() + self.__dict__: dict[str, Image | OtherImages | None] + + @property + def any(self) -> Image | None: + """Return a cover image. + If not present, fall back to any other available image. + """ + for value in self.__dict__.values(): + if isinstance(value, OtherImages): + for other_images in value.values(): + for image in other_images: + return image + continue + if value is not None: + return value + return None + + def as_dict(self) -> dict[str, list[Image]]: + """Return a flat dictionary representation of available images.""" + images: dict[str, list[Image]] = {} + for key, value in self.__dict__.items(): + if not isinstance(value, OtherImages): + if value is not None: + images[key] = [value] + continue + for other_key, other_values in value.items(): + other_images = images.get(other_key) + if not isinstance(other_images, list): + other_images = images[other_key] = [] + other_images += other_values + return images + + def _set_field(self, fieldname: str, value: Image) -> None: + old_value = self.__dict__.get(fieldname) + if fieldname.startswith(self._OTHER_PREFIX) or old_value is not None: + fieldname = fieldname[len(self._OTHER_PREFIX):] + other_values = self.other.get(fieldname, []) + other_values.append(value) + if _DEBUG: + print(f'Setting other image field "{fieldname}"') + self.other[fieldname] = other_values + return + if _DEBUG: + print(f'Setting image field "{fieldname}"') + self.__dict__[fieldname] = value + + def _update(self, other: Images) -> None: + for key, value in other.__dict__.items(): + if isinstance(value, OtherImages): + for other_key, other_values in value.items(): + for image_other in other_values: + self._set_field( + self._OTHER_PREFIX + other_key, image_other) + continue + if value is not None: + self._set_field(key, value) + + +class Image: + """A class representing an image embedded in an audio file.""" + def __init__(self, + name: str, + data: bytes, + mime_type: str | None = None) -> None: + self.name = name + self.data = data + self.mime_type = mime_type + self.description: str | None = None + + def __repr__(self) -> str: + variables = vars(self).copy() + data = variables.get("data") + if data is not None: + variables["data"] = (data[:45] + b'..') if len(data) > 45 else data + data_str = ', '.join(f'{k}={v!r}' for k, v in variables.items()) + return f'{type(self).__name__}({data_str})' + + +class OtherFields(_StringListDict): + """A dictionary containing additional metadata fields of an audio file.""" + + +class OtherImages(_ImageListDict): + """A dictionary containing additional images embedded in an audio file.""" + + +class _MP4(TinyTag): + """MP4 Audio Parser. + + https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html + https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap2/qtff2.html + """ + + _CUSTOM_FIELD_NAME_MAPPING = { + 'artists': 'artist', + 'conductor': 'other.conductor', + 'discsubtitle': 'other.set_subtitle', + 'initialkey': 'other.initial_key', + 'isrc': 'other.isrc', + 'language': 'other.language', + 'lyricist': 'other.lyricist', + 'media': 'other.media', + 'website': 'other.url', + 'license': 'other.license', + 'barcode': 'other.barcode', + 'catalognumber': 'other.catalog_number', + } + _IMAGE_MIME_TYPES = { + 13: 'image/jpeg', + 14: 'image/png' + } + _UNPACK_FORMATS = { + 1: '>b', + 2: '>h', + 4: '>i', + 8: '>q' + } + _VERSIONED_ATOMS = {b'meta', b'stsd'} # those have an extra 4 byte header + _FLAGGED_ATOMS = {b'stsd'} # these also have an extra 4 byte header + _ILST_PATH = [b'ftyp', b'moov', b'udta', b'meta', b'ilst'] + + _audio_data_tree: _DataTreeDict | None = None + _meta_data_tree: _DataTreeDict | None = None + + def _determine_duration(self, fh: BinaryIO) -> None: + # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap3/qtff3.html + if _MP4._audio_data_tree is None: + _MP4._audio_data_tree = { + b'moov': { + b'mvhd': _MP4._parse_mvhd, + b'trak': {b'mdia': {b"minf": {b"stbl": {b"stsd": { + b'mp4a': _MP4._parse_audio_sample_entry_mp4a, + b'alac': _MP4._parse_audio_sample_entry_alac + }}}}} + } + } + self._traverse_atoms(fh, path=_MP4._audio_data_tree) + + def _parse_tag(self, fh: BinaryIO) -> None: + # The parser tree: Each key is an atom name which is traversed if + # existing. Leaves of the parser tree are callables which receive + # the atom data. Callables return {fieldname: value} which is updates + # the TinyTag. + if _MP4._meta_data_tree is None: + _MP4._meta_data_tree = {b'moov': {b'udta': {b'meta': {b'ilst': { + # http://atomicparsley.sourceforge.net/mpeg-4files.html + # https://metacpan.org/dist/Image-ExifTool/source/lib/Image/ExifTool/QuickTime.pm#L3093 + b'\xa9ART': {b'data': _MP4._data_parser('artist')}, + b'\xa9alb': {b'data': _MP4._data_parser('album')}, + b'\xa9cmt': {b'data': _MP4._data_parser('comment')}, + b'\xa9com': {b'data': _MP4._data_parser('composer')}, + b'\xa9con': {b'data': _MP4._data_parser('other.conductor')}, + b'\xa9day': {b'data': _MP4._data_parser('year')}, + b'\xa9des': {b'data': _MP4._data_parser('other.description')}, + b'\xa9dir': {b'data': _MP4._data_parser('other.director')}, + b'\xa9gen': {b'data': _MP4._data_parser('genre')}, + b'\xa9grp': {b'data': _MP4._data_parser('other.grouping')}, + b'\xa9lyr': {b'data': _MP4._data_parser('other.lyrics')}, + b'\xa9mvc': { + b'data': _MP4._data_parser('other.movement_total') + }, + b'\xa9mvi': {b'data': _MP4._data_parser('other.movement')}, + b'\xa9mvn': { + b'data': _MP4._data_parser('other.movement_name') + }, + b'\xa9nam': {b'data': _MP4._data_parser('title')}, + b'\xa9pub': {b'data': _MP4._data_parser('other.publisher')}, + b'\xa9too': {b'data': _MP4._data_parser('other.encoded_by')}, + b'\xa9wrk': {b'data': _MP4._data_parser('other.work')}, + b'\xa9wrt': {b'data': _MP4._data_parser('composer')}, + b'aART': {b'data': _MP4._data_parser('albumartist')}, + b'cprt': {b'data': _MP4._data_parser('other.copyright')}, + b'desc': {b'data': _MP4._data_parser('other.description')}, + b'disk': {b'data': _MP4._nums_parser('disc', 'disc_total')}, + b'gnre': {b'data': _MP4._parse_id3v1_genre}, + b'shwm': {b'data': _MP4._data_parser('other.show_movement')}, + b'trkn': {b'data': _MP4._nums_parser('track', 'track_total')}, + b'tmpo': {b'data': _MP4._data_parser('other.bpm')}, + b'covr': {b'data': _MP4._parse_cover_image}, + b'----': _MP4._parse_custom_field, + }}}}} + self._traverse_atoms(fh, path=_MP4._meta_data_tree) + + def _traverse_atoms(self, + fh: BinaryIO, + path: _DataTreeDict, + stop_pos: int | None = None, + curr_path: list[bytes] | None = None) -> None: + header_len = ext_size_len = 8 + atom_header = fh.read(header_len) + while len(atom_header) == header_len: + atom_size = unpack('>I', atom_header[:4])[0] + atom_type = atom_header[4:] + if curr_path is None: # keep track how we traversed in the tree + curr_path = [atom_type] + if atom_size == 1: # 64-bit size + ext_size_header = fh.read(ext_size_len) + if len(ext_size_header) == ext_size_len: + atom_size = unpack('>Q', ext_size_header)[0] - ext_size_len + atom_size -= header_len + if atom_size <= 0: # empty atom, jump to next one + atom_header = fh.read(header_len) + continue + if _DEBUG: + print(f'{" " * 4 * len(curr_path)} ' + f'pos: {fh.tell() - header_len} ' + f'atom: {atom_type!r} len: {atom_size + header_len}') + if atom_type in self._VERSIONED_ATOMS: # jump atom version for now + fh.seek(4, SEEK_CUR) + atom_size -= 4 + if atom_type in self._FLAGGED_ATOMS: # jump atom flags for now + fh.seek(4, SEEK_CUR) + atom_size -= 4 + sub_path = path.get(atom_type, None) + # if the path leaf is a dict, traverse deeper into the tree: + if isinstance(sub_path, dict): + atom_end_pos = fh.tell() + atom_size + self._traverse_atoms(fh, path=sub_path, stop_pos=atom_end_pos, + curr_path=curr_path + [atom_type]) + # if the path-leaf is a callable, call it on the atom data + elif callable(sub_path): + for fieldname, value in sub_path(fh.read(atom_size)).items(): + if _DEBUG: + print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname) + if isinstance(value, Image): + if self._load_image: + # pylint: disable=protected-access + self.images._set_field( + fieldname[len('images.'):], value) + elif isinstance(value, list): + for subval in value: + self._set_field(fieldname, subval) + else: + self._set_field(fieldname, value) + # unknown data atom, try to parse it + elif curr_path == self._ILST_PATH: + atom_end_pos = fh.tell() + atom_size + field_name = ( + self._OTHER_PREFIX + atom_type.decode('latin-1').lower() + ) + fh.seek(-header_len, SEEK_CUR) + self._traverse_atoms( + fh, + path={atom_type: {b'data': self._data_parser(field_name)}}, + stop_pos=atom_end_pos, curr_path=curr_path + [atom_type]) + # if no action was specified using dict or callable, jump over atom + else: + fh.seek(atom_size, SEEK_CUR) + # check if we have reached the end of this branch: + if stop_pos and fh.tell() >= stop_pos: + return # return to parent (next parent node in tree) + atom_header = fh.read(header_len) # read next atom + + @classmethod + def _data_parser(cls, fieldname: str) -> Callable[[bytes], dict[str, str]]: + def _parse_data_atom(data_atom: bytes) -> dict[str, str]: + data_type = unpack('>I', data_atom[:4])[0] + data = data_atom[8:] + value = None + if data_type == 1: # UTF-8 string + value = data.decode('utf-8', 'replace') + elif data_type == 21: # BE signed integer + fmts = cls._UNPACK_FORMATS + data_len = len(data) + if data_len in fmts: + value = str(unpack(fmts[data_len], data)[0]) + if value: + return {fieldname: value} + return {} + return _parse_data_atom + + @classmethod + def _nums_parser( + cls, fieldname1: str, fieldname2: str + ) -> Callable[[bytes], dict[str, int]]: + def _parse_nums(data_atom: bytes) -> dict[str, int]: + number_data = data_atom[8:14] + numbers = unpack('>3H', number_data) + # for some reason the first number is always irrelevant. + return {fieldname1: numbers[1], fieldname2: numbers[2]} + return _parse_nums + + @classmethod + def _parse_id3v1_genre(cls, data_atom: bytes) -> dict[str, str]: + # dunno why genre is offset by -1 but that's how mutagen does it + idx = unpack('>H', data_atom[8:])[0] - 1 + result = {} + # pylint: disable=protected-access + if idx < len(_ID3._ID3V1_GENRES): + result['genre'] = _ID3._ID3V1_GENRES[idx] + return result + + @classmethod + def _parse_cover_image(cls, data_atom: bytes) -> dict[str, Image]: + data_type = unpack('>I', data_atom[:4])[0] + image = Image( + 'front_cover', data_atom[8:], cls._IMAGE_MIME_TYPES.get(data_type)) + return {'images.front_cover': image} + + @classmethod + def _read_extended_descriptor(cls, esds_atom: BinaryIO) -> None: + for _i in range(4): + if esds_atom.read(1) != b'\x80': + break + + @classmethod + def _parse_custom_field(cls, data: bytes) -> dict[str, list[str]]: + fh = BytesIO(data) + header_len = 8 + field_name = None + values = [] + atom_header = fh.read(header_len) + while len(atom_header) == header_len: + atom_size = unpack('>I', atom_header[:4])[0] - header_len + atom_type = atom_header[4:] + if atom_type == b'name': + atom_value = fh.read(atom_size)[4:].lower() + field_name = atom_value.decode('utf-8', 'replace') + # pylint: disable=protected-access + field_name = cls._CUSTOM_FIELD_NAME_MAPPING.get( + field_name, TinyTag._OTHER_PREFIX + field_name) + elif atom_type == b'data' and field_name: + data_atom = fh.read(atom_size) + parser = cls._data_parser(field_name) + atom_values = parser(data_atom) + if field_name in atom_values: + values.append(atom_values[field_name]) + else: + fh.seek(atom_size, SEEK_CUR) + atom_header = fh.read(header_len) # read next atom + if field_name and values: + return {field_name: values} + return {} + + @classmethod + def _parse_audio_sample_entry_mp4a(cls, data: bytes) -> dict[str, int]: + # this atom also contains the esds atom: + # https://ffmpeg.org/doxygen/0.6/mov_8c-source.html + # http://xhelmboyx.tripod.com/formats/mp4-layout.txt + # http://sasperger.tistory.com/103 + + # jump over version and flags + channels = unpack('>H', data[16:18])[0] + # jump over bit_depth, QT compr id & pkt size + sr = unpack('>I', data[22:26])[0] + + # ES Description Atom + esds_atom_size = unpack('>I', data[28:32])[0] + esds_atom = BytesIO(data[36:36 + esds_atom_size]) + esds_atom.seek(5, SEEK_CUR) # jump over version, flags and tag + + # ES Descriptor + cls._read_extended_descriptor(esds_atom) + esds_atom.seek(4, SEEK_CUR) # jump over ES id, flags and tag + + # Decoder Config Descriptor + cls._read_extended_descriptor(esds_atom) + esds_atom.seek(9, SEEK_CUR) + avg_br = unpack('>I', esds_atom.read(4))[0] / 1000 # kbit/s + return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br} + + @classmethod + def _parse_audio_sample_entry_alac(cls, data: bytes) -> dict[str, int]: + # https://github.com/macosforge/alac/blob/master/ALACMagicCookieDescription.txt + bitdepth = data[45] + channels = data[49] + avg_br, sr = unpack('>II', data[56:64]) + avg_br /= 1000 # kbit/s + return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br, + 'bitdepth': bitdepth} + + @classmethod + def _parse_mvhd(cls, data: bytes) -> dict[str, float]: + # http://stackoverflow.com/a/3639993/1191373 + version = data[0] + # jump over flags, create & mod times + if version == 0: # uses 32 bit integers for timestamps + time_scale, duration = unpack('>II', data[12:20]) + else: # version == 1: # uses 64-bit integers for timestamps + time_scale, duration = unpack('>IQ', data[20:32]) + return {'duration': duration / time_scale} + + +class _ID3(TinyTag): + """MP3 Parser.""" + + _ID3_MAPPING = { + # Mapping from Frame ID to a field of the TinyTag + # https://exiftool.org/TagNames/ID3.html + b'COMM': 'comment', b'COM': 'comment', + b'TRCK': 'track', b'TRK': 'track', + b'TYER': 'year', b'TYE': 'year', b'TDRC': 'year', + b'TALB': 'album', b'TAL': 'album', + b'TPE1': 'artist', b'TP1': 'artist', + b'TIT2': 'title', b'TT2': 'title', + b'TCON': 'genre', b'TCO': 'genre', + b'TPOS': 'disc', b'TPA': 'disc', + b'TPE2': 'albumartist', b'TP2': 'albumartist', + b'TCOM': 'composer', b'TCM': 'composer', + b'WOAR': 'other.url', b'WAR': 'other.url', + b'TSRC': 'other.isrc', b'TRC': 'other.isrc', + b'TCOP': 'other.copyright', b'TCR': 'other.copyright', + b'TBPM': 'other.bpm', b'TBP': 'other.bpm', + b'TKEY': 'other.initial_key', b'TKE': 'other.initial_key', + b'TLAN': 'other.language', b'TLA': 'other.language', + b'TPUB': 'other.publisher', b'TPB': 'other.publisher', + b'USLT': 'other.lyrics', b'ULT': 'other.lyrics', + b'TPE3': 'other.conductor', b'TP3': 'other.conductor', + b'TEXT': 'other.lyricist', b'TXT': 'other.lyricist', + b'TSST': 'other.set_subtitle', + b'TENC': 'other.encoded_by', b'TEN': 'other.encoded_by', + b'TSSE': 'other.encoder_settings', b'TSS': 'other.encoder_settings', + b'TMED': 'other.media', b'TMT': 'other.media', + b'WCOP': 'other.license', + b'MVNM': 'other.movement_name', + b'MVIN': 'other.movement', + b'GRP1': 'modern_grouping', b'GP1': 'modern_grouping', + b'TIT1': 'legacy_grouping', b'TT1': 'legacy_grouping', + } + _ID3_MAPPING_CUSTOM = { + 'artists': 'artist', + 'director': 'other.director', + 'license': 'other.license', + 'barcode': 'other.barcode', + 'catalognumber': 'other.catalog_number', + 'showmovement': 'other.show_movement' + } + _EMPTY_FRAME_IDS = {b'\x00\x00\x00\x00', b'\x00\x00\x00'} + _IMAGE_FRAME_IDS = {b'APIC', b'PIC'} + _CUSTOM_FRAME_IDS = {b'TXXX', b'TXX'} + _SYNCED_LYRICS_FRAME_IDS = {b'SYLT', b'SLT'} + _IGNORED_FRAME_IDS = { + b'AENC', b'CRA', + b'APIC', b'PIC', + b'ASPI', + b'ATXT', + b'CHAP', + b'COMR', + b'CRM', + b'CTOC', + b'ENCR', + b'EQU2', b'EQU', + b'ETCO', b'ETC', + b'GEOB', b'GEO', + b'GRID', + b'LINK', b'LNK', + b'MCDI', b'MCI', + b'MLLT', b'MLL', + b'PCNT', b'CNT', + b'POPM', b'POP', + b'POSS', + b'PRIV', + b'RBUF', b'BUF', + b'RGAD', + b'RVA2', b'RVA', + b'RVRB', b'REV', + b'SEEK', + b'SIGN', + b'SYTC', b'STC', + } + _ID3V1_TAG_SIZE = 128 + _MAX_ESTIMATION_SEC = 30.0 + _CBR_DETECTION_FRAME_COUNT = 5 + _USE_XING_HEADER = True # much faster, but can be deactivated for testing + + _ID3V1_GENRES = ( + 'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco', + 'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies', + 'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial', + 'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack', + 'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion', + 'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game', + 'Sound Clip', 'Gospel', 'Noise', 'AlternRock', 'Bass', 'Soul', 'Punk', + 'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock', + 'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic', + 'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult', + 'Gangsta', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle', + 'Native American', 'Cabaret', 'New Wave', 'Psychadelic', 'Rave', + 'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz', + 'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock', + # Wimamp Extended Genres + 'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast Fusion', 'Bebob', + 'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock', + 'Progressive Rock', 'Psychedelic Rock', 'Symphonic Rock', 'Slow Rock', + 'Big Band', 'Chorus', 'Easy listening', 'Acoustic', 'Humour', 'Speech', + 'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony', + 'Booty Bass', 'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club', + 'Tango', 'Samba', 'Folklore', 'Ballad', 'Power Ballad', + 'Rhythmic Soul', 'Freestyle', 'Duet', 'Punk Rock', 'Drum Solo', + 'A capella', 'Euro-House', 'Dance Hall', 'Goa', 'Drum & Bass', + 'Club-House', 'Hardcore Techno', 'Terror', 'Indie', 'BritPop', + 'Afro-Punk', 'Polsk Punk', 'Beat', 'Christian Gangsta Rap', + 'Heavy Metal', 'Black Metal', 'Contemporary Christian', + 'Christian Rock', + # WinAmp 1.91 + 'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'Jpop', 'Synthpop', + # WinAmp 5.6 + 'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat', + 'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro', + 'Electroclash', 'Emo', 'Experimental', 'Garage', 'Illbient', + 'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge', + 'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock', + 'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music', + 'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle', + 'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock', + 'Psybient', + ) + _ID3V2_2_IMAGE_FORMATS = { + b'bmp': 'image/bmp', + b'jpg': 'image/jpeg', + b'png': 'image/png', + } + _IMAGE_TYPES = ( + 'other.generic', + 'other.icon', + 'other.alt_icon', + 'front_cover', + 'back_cover', + 'other.leaflet', + 'media', + 'other.lead_artist', + 'other.artist', + 'other.conductor', + 'other.band', + 'other.composer', + 'other.lyricist', + 'other.recording_location', + 'other.during_recording', + 'other.during_performance', + 'other.screen_capture', + 'other.bright_colored_fish', + 'other.illustration', + 'other.band_logo', + 'other.publisher_logo', + ) + _UNKNOWN_IMAGE_TYPE = 'other.unknown' + + # see this page for the magic values used in mp3: + # http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm + _SAMPLE_RATES = ( + (11025, 12000, 8000), # MPEG 2.5 + (0, 0, 0), # reserved + (22050, 24000, 16000), # MPEG 2 + (44100, 48000, 32000), # MPEG 1 + ) + _V1L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, + 448, 0) + _V1L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, + 384, 0) + _V1L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, + 320, 0) + _V2L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, + 256, 0) + _V2L2 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0) + _V2L3 = _V2L2 + _NONE = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + _BITRATE_VERSION_LAYERS = ( + # note that layers go from 3 to 1 by design, first layer id is reserved + (_NONE, _V2L3, _V2L2, _V2L1), # MPEG Version 2.5 + (_NONE, _NONE, _NONE, _NONE), # reserved + (_NONE, _V2L3, _V2L2, _V2L1), # MPEG Version 2 + (_NONE, _V1L3, _V1L2, _V1L1), # MPEG Version 1 + ) + _SAMPLES_PER_FRAME = 1152 # the default frame size for mp3 + _CHANNELS_PER_CHANNEL_MODE = ( + 2, # 00 Stereo + 2, # 01 Joint stereo (Stereo) + 2, # 10 Dual channel (2 mono channels) + 1, # 11 Single channel (Mono) + ) + + def __init__(self) -> None: + super().__init__() + # save position after the ID3 tag for duration measurement speedup + self._bytepos_after_id3v2 = -1 + self._modern_grouping_values: list[str] = [] + self._legacy_grouping_values: list[str] = [] + + @staticmethod + def _parse_xing_header(fh: BinaryIO) -> tuple[int, int]: + # see: http://www.mp3-tech.org/programmer/sources/vbrheadersdk.zip + fh.seek(4, SEEK_CUR) # read over Xing header + header_flags = unpack('>i', fh.read(4))[0] + frames = byte_count = 0 + if header_flags & 1: # FRAMES FLAG + frames = unpack('>i', fh.read(4))[0] + if header_flags & 2: # BYTES FLAG + byte_count = unpack('>i', fh.read(4))[0] + if header_flags & 4: # TOC FLAG + fh.seek(100, SEEK_CUR) + if header_flags & 8: # VBR SCALE FLAG + fh.seek(4, SEEK_CUR) + return frames, byte_count + + def _determine_duration(self, fh: BinaryIO) -> None: + # if tag reading was disabled, find start position of audio data + if self._bytepos_after_id3v2 == -1: + self._parse_id3v2_header(fh) + + max_estimation_frames = ( + (self._MAX_ESTIMATION_SEC * 44100) // self._SAMPLES_PER_FRAME) + frame_size_accu = 0 + audio_offset = self._bytepos_after_id3v2 + frames = 0 # count frames for determining mp3 duration + bitrate_accu = 0 # add up bitrates to find average bitrate to detect + last_bitrates = set() # CBR mp3s (multiple frames with same bitrates) + # seek to first position after id3 tag (speedup for large header) + first_mpeg_id = None + fh.seek(self._bytepos_after_id3v2) + while True: + # reading through garbage until 11 '1' sync-bits are found + header = fh.read(4) + header_len = len(header) + if header_len < 4: + if frames: + self.bitrate = bitrate_accu / frames + break # EOF + _sync, conf, bitrate_freq, rest = unpack('4B', header) + br_id = (bitrate_freq >> 4) & 0x0F # biterate id + sr_id = (bitrate_freq >> 2) & 0x03 # sample rate id + padding = 1 if bitrate_freq & 0x02 > 0 else 0 + mpeg_id = (conf >> 3) & 0x03 + layer_id = (conf >> 1) & 0x03 + channel_mode = (rest >> 6) & 0x03 + # check for eleven 1s, validate bitrate and sample rate + if (header[:2] <= b'\xFF\xE0' + or (first_mpeg_id is not None and first_mpeg_id != mpeg_id) + or br_id > 14 or br_id == 0 or sr_id == 3 or layer_id == 0 + or mpeg_id == 1): + # invalid frame, find next sync header + idx = header.find(b'\xFF', 1) + next_offset = header_len + if idx != -1: + next_offset -= idx + fh.seek(idx - header_len, SEEK_CUR) + if frames == 0: + audio_offset += next_offset + continue + if first_mpeg_id is None: + first_mpeg_id = mpeg_id + self.channels = self._CHANNELS_PER_CHANNEL_MODE[channel_mode] + frame_br = self._BITRATE_VERSION_LAYERS[mpeg_id][layer_id][br_id] + self.samplerate = samplerate = self._SAMPLE_RATES[mpeg_id][sr_id] + frame_length = (144000 * frame_br) // samplerate + padding + # There might be a xing header in the first frame that contains + # all the info we need, otherwise parse multiple frames to find the + # accurate average bitrate + if frames == 0 and self._USE_XING_HEADER: + prev_offset = header_len + audio_offset + frame_content = fh.read(frame_length) + xing_header_offset = frame_content.find(b'Xing') + if xing_header_offset != -1: + fh.seek(prev_offset + xing_header_offset) + xframes, byte_count = self._parse_xing_header(fh) + if xframes > 0 and byte_count > 0: + # MPEG-2 Audio Layer III uses 576 samples per frame + samples_pf = self._SAMPLES_PER_FRAME + if mpeg_id <= 2: + samples_pf = 576 + self.duration = dur = xframes * samples_pf / samplerate + self.bitrate = byte_count * 8 / dur / 1000 + return + fh.seek(prev_offset) + + frames += 1 # it's most probably a mp3 frame + bitrate_accu += frame_br + if frames <= self._CBR_DETECTION_FRAME_COUNT: + last_bitrates.add(frame_br) + + frame_size_accu += frame_length + # if bitrate does not change over time its probably CBR + is_cbr = (frames == self._CBR_DETECTION_FRAME_COUNT + and len(last_bitrates) == 1) + if frames == max_estimation_frames or is_cbr: + # try to estimate duration + stream_size = ( + self.filesize - audio_offset - self._ID3V1_TAG_SIZE) + est_frame_count = stream_size / (frame_size_accu / frames) + samples = est_frame_count * self._SAMPLES_PER_FRAME + self.duration = samples / samplerate + self.bitrate = bitrate_accu / frames + return + + if frame_length > 1: # jump over current frame body + fh.seek(frame_length - header_len, SEEK_CUR) + if self.samplerate: + self.duration = frames * self._SAMPLES_PER_FRAME / self.samplerate + + def _parse_tag(self, fh: BinaryIO) -> None: + self._parse_id3v2(fh) + if self.filesize >= self._ID3V1_TAG_SIZE: + # try parsing id3v1 at the end of file + fh.seek(self.filesize - self._ID3V1_TAG_SIZE) + self._parse_id3v1(fh) + + def _parse_id3v2_header(self, fh: BinaryIO) -> tuple[int, bool, int]: + size = major = 0 + extended = False + # for info on the specs, see: http://id3.org/Developer%20Information + header = fh.read(10) + # check if there is an ID3v2 tag at the beginning of the file + if header.startswith(b'ID3'): + major = header[3] + if _DEBUG: + print(f'Found id3 v2.{major}') + extended = (header[5] & 0x40) > 0 + size = self._unsynchsafe(unpack('4B', header[6:10])) + self._bytepos_after_id3v2 = size + return size, extended, major + + def _parse_id3v2(self, fh: BinaryIO) -> None: + size, extended, major = self._parse_id3v2_header(fh) + if size <= 0: + return + end_pos = fh.tell() + size + parsed_size = 0 + if extended: # just read over the extended header. + extd_size = self._unsynchsafe(unpack('4B', fh.read(6)[:4])) + fh.seek(extd_size - 6, SEEK_CUR) # jump over extended_header + while parsed_size < size: + frame_size = self._parse_frame(fh, size, id3version=major) + if frame_size == -1: + break + parsed_size += frame_size + fh.seek(end_pos) + self._set_grouping_work_fields() + + def _parse_id3v1(self, fh: BinaryIO) -> None: + content = fh.read(3 + 30 + 30 + 30 + 4 + 30 + 1) + if content[:3] != b'TAG': # check if this is an ID3 v1 tag + return + + def asciidecode(x: bytes) -> str: + return self._unpad( + x.decode(self._default_encoding or 'latin1', 'replace')) + # Only set fields that were not set by ID3v2 tags, as ID3v1 + # tags are more likely to be outdated or have encoding issues + if not self.title: + value = asciidecode(content[3:33]) + if value: + self._set_field('title', value) + if not self.artist: + value = asciidecode(content[33:63]) + if value: + self._set_field('artist', value) + if not self.album: + value = asciidecode(content[63:93]) + if value: + self._set_field('album', value) + if not self.year: + value = asciidecode(content[93:97]) + if value: + self._set_field('year', value) + comment = content[97:127] + if b'\x00\x00' < comment[-2:] < b'\x01\x00': + if self.track is None: + self._set_field('track', ord(comment[-1:])) + comment = comment[:-2] + if not self.comment: + value = asciidecode(comment) + if value: + self._set_field('comment', value) + if not self.genre: + genre_id = ord(content[127:128]) + if genre_id < len(self._ID3V1_GENRES): + self._set_field('genre', self._ID3V1_GENRES[genre_id]) + + def _parse_custom_field(self, content: str) -> bool: + custom_field_name, separator, value = content.partition('\x00') + custom_field_name_lower = custom_field_name.lower() + value = value.lstrip('\ufeff') + if custom_field_name_lower and separator and value: + field_name = self._ID3_MAPPING_CUSTOM.get( + custom_field_name_lower, + self._OTHER_PREFIX + custom_field_name_lower) + self._set_field(field_name, value) + return True + return False + + def _set_grouping_work_fields(self) -> None: + # iTunes 12.5.4.42 added a new GRP1 frame for 'grouping', and + # repurposed the TIT1 frame for 'work'. Handle this mess here. + if self._modern_grouping_values: + for value in self._modern_grouping_values: + self._set_field('other.grouping', value) + for value in self._legacy_grouping_values: + self._set_field('other.work', value) + return + for value in self._legacy_grouping_values: + self._set_field('other.grouping', value) + + @classmethod + def _create_tag_image(cls, + data: bytes, + pic_type: int, + mime_type: str | None = None, + description: str | None = None) -> tuple[str, Image]: + field_name = cls._UNKNOWN_IMAGE_TYPE + if 0 <= pic_type <= len(cls._IMAGE_TYPES): + field_name = cls._IMAGE_TYPES[pic_type] + name = field_name + if field_name.startswith(cls._OTHER_PREFIX): + name = field_name[len(cls._OTHER_PREFIX):] + image = Image(name, data) + if mime_type: + image.mime_type = mime_type + if description: + image.description = description + return field_name, image + + def _parse_image(self, + frame_id: bytes, + content: bytes) -> tuple[str, Image]: + # See section 4.14: http://id3.org/id3v2.4.0-frames + encoding = content[:1] + if frame_id == b'PIC': # ID3 v2.2: + imgformat = content[1:4].lower() + mime_type = self._ID3V2_2_IMAGE_FORMATS.get(imgformat) + # skip encoding (1), imgformat (3), pictype(1) + desc_start_pos = 5 + else: # ID3 v2.3+ + mime_start_pos = 1 + mime_end_pos = self._find_string_end_pos( + content, start_pos=mime_start_pos) + mime_type = self._decode_string( + content[mime_start_pos:mime_end_pos]).lower() + # skip mtype, pictype(1) + desc_start_pos = mime_end_pos + 1 + pic_type = content[desc_start_pos - 1] + desc_end_pos = self._find_string_end_pos( + content, encoding, desc_start_pos) + # skip stray null byte in broken file + if (desc_end_pos + 1 < len(content) + and content[desc_end_pos] == 0 + and content[desc_end_pos + 1] != 0): + desc_end_pos += 1 + desc = self._decode_string( + encoding + content[desc_start_pos:desc_end_pos]) + return self._create_tag_image( + content[desc_end_pos:], pic_type, mime_type, desc) + + @staticmethod + def _lrc_timestamp(seconds: float) -> str: + cs = int(seconds * 100) + minutes, cs = divmod(cs, 6000) + seconds, cs = divmod(cs, 100) + return f"{minutes:02d}:{seconds:02d}.{cs:02d}" + + def _parse_synced_lyrics(self, content: bytes) -> str: + # Convert ID3 synced lyrics to LRC format + content_length = len(content) + encoding = content[:1] + # skip language (3) + timestamp_format = content[4:5] + # skip content type (1) + start_pos = 6 + end_pos = self._find_string_end_pos(content, encoding, start_pos) + lyrics = "" + offset = end_pos + found_line = False + while offset < content_length: + end_pos = self._find_string_end_pos(content, encoding, offset) + value = self._decode_string( + encoding + content[offset:end_pos]).lstrip('\n') + offset = end_pos + time = unpack('>I', content[offset:offset + 4])[0] + offset += 4 + if found_line: + lyrics += '\n' + found_line = True + if timestamp_format == b'\x02': + # time in milliseconds + timestamp = self._lrc_timestamp(time / 1000) + else: + lyrics += value + continue + lyrics += f'[{timestamp}]{value}' + return lyrics + + def _parse_frame(self, + fh: BinaryIO, + total_size: int, + id3version: int | None = None) -> int: + # ID3v2.2 especially ugly. see: http://id3.org/id3v2-00 + header_len = 6 if id3version == 2 else 10 + frame_size_bytes = 3 if id3version == 2 else 4 + is_synchsafe_int = id3version == 4 + header = fh.read(header_len) + if len(header) != header_len: + return -1 + frame_id = header[:frame_size_bytes] + if frame_id in self._EMPTY_FRAME_IDS: + return -1 + frame_size: int + if frame_size_bytes == 3: + frame_size = unpack('>I', b'\x00' + header[3:6])[0] + elif is_synchsafe_int: + frame_size = self._unsynchsafe(unpack('4B', header[4:8])) + else: + frame_size = unpack('>I', header[4:8])[0] + if _DEBUG: + print(f'Found id3 Frame {frame_id!r} at ' + f'{fh.tell()}-{fh.tell() + frame_size} of {self.filesize}') + if frame_size > total_size: + # invalid frame size, stop here + return -1 + should_set_field = True + if self._parse_tags and frame_id in self._ID3_MAPPING: + fieldname = self._ID3_MAPPING[frame_id] + language = fieldname in {'comment', 'other.lyrics'} + value = self._decode_string(fh.read(frame_size), language) + if not value: + return frame_size + if fieldname == "comment": + # check if comment is a key-value pair (used by iTunes) + should_set_field = not self._parse_custom_field(value) + elif fieldname in {'track', 'disc', 'other.movement'}: + if '/' in value: + value, total = value.split('/')[:2] + if total.isdecimal(): + self._set_field(f'{fieldname}_total', int(total)) + if value.isdecimal(): + self._set_field(fieldname, int(value)) + should_set_field = False + elif fieldname == 'genre': + genre_id = 255 + # funky: id3v1 genre hidden in a id3v2 field + if value.isdecimal(): + genre_id = int(value) + # funkier: the TCO may contain genres in parens, e.g '(13)' + elif value.startswith('('): + end_pos = value.find(')') + parens_text = value[1:end_pos] + if end_pos > 0 and parens_text.isdecimal(): + genre_id = int(parens_text) + if 0 <= genre_id < len(self._ID3V1_GENRES): + value = self._ID3V1_GENRES[genre_id] + elif fieldname == 'modern_grouping': + self._modern_grouping_values.append(value) + should_set_field = False + elif fieldname == 'legacy_grouping': + self._legacy_grouping_values.append(value) + should_set_field = False + if should_set_field: + self._set_field(fieldname, value) + elif self._parse_tags and frame_id in self._SYNCED_LYRICS_FRAME_IDS: + lyrics = self._parse_synced_lyrics(fh.read(frame_size)) + self._set_field('other.lyrics', lyrics) + elif self._parse_tags and frame_id in self._CUSTOM_FRAME_IDS: + # custom fields + value = self._decode_string(fh.read(frame_size)) + if value: + self._parse_custom_field(value) + elif self._parse_tags and frame_id not in self._IGNORED_FRAME_IDS: + # unknown, try to add to other dict + value = self._decode_string(fh.read(frame_size)) + if value: + self._set_field( + self._OTHER_PREFIX + frame_id.decode('latin-1').lower(), + value) + elif self._load_image and frame_id in self._IMAGE_FRAME_IDS: + field_name, image = self._parse_image( + frame_id, fh.read(frame_size)) + # pylint: disable=protected-access + self.images._set_field(field_name, image) + else: # skip frame + fh.seek(frame_size, SEEK_CUR) + return frame_size + + @staticmethod + def _find_string_end_pos(content: bytes, + encoding: bytes = b'\x00', + start_pos: int = 0) -> int: + # latin1 and utf-8 are 1 byte + if encoding in {b'\x00', b'\x03'}: + return content.find(b'\x00', start_pos) + 1 + end_pos = 0 + for i in range(start_pos, len(content), 2): + if content[i:i + 2] == b'\x00\x00': + end_pos = i + 2 + break + return end_pos + + def _decode_string(self, value: bytes, language: bool = False) -> str: + default_encoding = 'ISO-8859-1' + if self._default_encoding: + default_encoding = self._default_encoding + # it's not my fault, this is the spec. + first_byte = value[:1] + if first_byte == b'\x00': # ISO-8859-1 + value = value[1:] + encoding = default_encoding + elif first_byte == b'\x01': # UTF-16 with BOM + value = value[1:] + # remove language (but leave BOM) + if language: + if value[3:5] in {b'\xfe\xff', b'\xff\xfe'}: + value = value[3:] + if value[:3].isalpha(): + value = value[3:] # remove language + # strip optional additional null bytes + value = value.lstrip(b'\x00') + # read byte order mark to determine endianness + encoding = ('UTF-16be' if value.startswith(b'\xfe\xff') + else 'UTF-16le') + # strip the bom if it exists + if value.startswith(b'\xfe\xff') or value.startswith(b'\xff\xfe'): + value = value[2:] if len(value) % 2 == 0 else value[2:-1] + # remove ADDITIONAL OTHER BOM :facepalm: + if value.startswith(b'\x00\x00\xff\xfe'): + value = value[4:] + elif first_byte == b'\x02': # UTF-16 without BOM + # strip optional null byte, if byte count uneven + value = value[1:-1] if len(value) % 2 == 0 else value[1:] + encoding = 'UTF-16be' + elif first_byte == b'\x03': # UTF-8 + value = value[1:] + encoding = 'UTF-8' + else: + encoding = default_encoding # wild guess + if language and value[:3].isalpha(): + value = value[3:] # remove language + return self._unpad(value.decode(encoding, 'replace')) + + @staticmethod + def _unsynchsafe(ints: tuple[int, ...]) -> int: + return (ints[0] << 21) + (ints[1] << 14) + (ints[2] << 7) + ints[3] + + +class _Ogg(TinyTag): + """OGG Parser.""" + + _VORBIS_MAPPING = { + 'album': 'album', + 'albumartist': 'albumartist', + 'title': 'title', + 'artist': 'artist', + 'artists': 'artist', + 'author': 'artist', + 'date': 'year', + 'tracknumber': 'track', + 'tracktotal': 'track_total', + 'totaltracks': 'track_total', + 'discnumber': 'disc', + 'disctotal': 'disc_total', + 'totaldiscs': 'disc_total', + 'genre': 'genre', + 'description': 'comment', + 'comment': 'comment', + 'comments': 'comment', + 'composer': 'composer', + 'bpm': 'other.bpm', + 'copyright': 'other.copyright', + 'isrc': 'other.isrc', + 'lyrics': 'other.lyrics', + 'unsyncedlyrics': 'other.lyrics', + 'publisher': 'other.publisher', + 'language': 'other.language', + 'director': 'other.director', + 'website': 'other.url', + 'conductor': 'other.conductor', + 'lyricist': 'other.lyricist', + 'discsubtitle': 'other.set_subtitle', + 'setsubtitle': 'other.set_subtitle', + 'initialkey': 'other.initial_key', + 'key': 'other.initial_key', + 'encodedby': 'other.encoded_by', + 'encodersettings': 'other.encoder_settings', + 'media': 'other.media', + 'license': 'other.license', + 'barcode': 'other.barcode', + 'catalognumber': 'other.catalog_number', + 'movementname': 'other.movement_name', + 'movement': 'other.movement', + 'movementtotal': 'other.movement_total', + 'showmovement': 'other.show_movement', + 'grouping': 'other.grouping', + 'contentgroup': 'other.grouping', + 'work': 'other.work' + } + + def __init__(self) -> None: + super().__init__() + self._granule_pos = 0 + self._pre_skip = 0 # number of samples to skip in opus stream + self._audio_size: int | None = None # size of opus audio stream + + def _determine_duration(self, fh: BinaryIO) -> None: + if not self._tags_parsed: + self._parse_tag(fh) # determine sample rate + if self.duration is not None or not self.samplerate: + return # either ogg flac or invalid file + self.duration = max( + (self._granule_pos - self._pre_skip) / self.samplerate, 0 + ) + if self._audio_size is None or not self.duration: + return # not an opus file + self.bitrate = self._audio_size * 8 / self.duration / 1000 + + def _parse_tag(self, fh: BinaryIO) -> None: + check_flac_second_packet = False + check_speex_second_packet = False + for packet in self._parse_pages(fh): + if packet.startswith(b"\x01vorbis"): + if self._parse_duration: + self.channels, self.samplerate = unpack( + " None: + # for the spec, see: http://xiph.org/vorbis/doc/v-comment.html + # discnumber tag based on: https://en.wikipedia.org/wiki/Vorbis_comment + # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Vorbis.html + if has_vendor: + vendor_length = unpack('I', fh.read(4))[0] + fh.seek(vendor_length, SEEK_CUR) # jump over vendor + elements = unpack('I', fh.read(4))[0] + for _i in range(elements): + length = unpack('I', fh.read(4))[0] + keyvalpair = fh.read(length).decode('utf-8', 'replace') + if '=' in keyvalpair: + key, value = keyvalpair.split('=', 1) + key_lower = key.lower() + if key_lower == "metadata_block_picture": + if self._load_image: + if _DEBUG: + print('Found Vorbis Image', key, value[:64]) + # pylint: disable=protected-access + fieldname, fieldvalue = _Flac._parse_image( + BytesIO(a2b_base64(value))) + self.images._set_field(fieldname, fieldvalue) + else: + if _DEBUG: + print('Found Vorbis Comment', key, value[:64]) + fieldname = self._VORBIS_MAPPING.get( + key_lower, self._OTHER_PREFIX + key_lower) + if fieldname in { + 'track', 'disc', 'track_total', 'disc_total' + }: + if fieldname in {'track', 'disc'} and '/' in value: + value, total = value.split('/')[:2] + if total.isdecimal(): + self._set_field( + f'{fieldname}_total', int(total)) + if value.isdecimal(): + self._set_field(fieldname, int(value)) + elif value: + self._set_field(fieldname, value) + + def _parse_pages(self, fh: BinaryIO) -> Iterator[bytearray]: + # for the spec, see: https://wiki.xiph.org/Ogg + packet_data = bytearray() + current_serial = None + last_granule_pos = 0 + last_audio_size = 0 + header_len = 27 + page_header = fh.read(header_len) # read ogg page header + while len(page_header) == header_len: + version = page_header[4] + if page_header[:4] != b'OggS' or version != 0: + raise ParseError('Invalid OGG header') + # https://xiph.org/ogg/doc/framing.html + header_type = page_header[5] + eos = header_type & 0x04 + granule_pos, serial = unpack(' 0: + if eos: + self._granule_pos = granule_pos + else: + self._granule_pos = last_granule_pos + last_granule_pos = granule_pos + segments = page_header[26] + seg_sizes = unpack('B' * segments, fh.read(segments)) + read_size = 0 + audio_size = 0 + for seg_size in seg_sizes: # read all segments + read_size += seg_size + if self._audio_size is not None: + audio_size += seg_size + # less than 255 bytes means end of packet + if seg_size < 255 and serial_match and not self._tags_parsed: + packet_data += fh.read(read_size) + yield packet_data + packet_data.clear() + read_size = 0 + if read_size: + if not serial_match or self._tags_parsed: + fh.seek(read_size, SEEK_CUR) + else: # packet continues on next page + packet_data += fh.read(read_size) + if serial_match and self._audio_size is not None: + if eos: + self._audio_size += last_audio_size + audio_size + else: + self._audio_size += last_audio_size + last_audio_size = audio_size + if eos: + break + page_header = fh.read(header_len) + + +class _Wave(TinyTag): + """WAVE Parser. + + https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html + """ + + _RIFF_MAPPING = { + b'INAM': 'title', + b'TITL': 'title', + b'IPRD': 'album', + b'IART': 'artist', + b'IBPM': 'other.bpm', + b'ICMT': 'comment', + b'IMUS': 'composer', + b'ICOP': 'other.copyright', + b'ICRD': 'year', + b'IGNR': 'genre', + b'ILNG': 'other.language', + b'ISRC': 'other.isrc', + b'IPUB': 'other.publisher', + b'IPRT': 'track', + b'ITRK': 'track', + b'TRCK': 'track', + b'IBSU': 'other.url', + b'YEAR': 'year', + b'IWRI': 'other.lyricist', + b'IENC': 'other.encoded_by', + b'IMED': 'other.media', + } + + def _determine_duration(self, fh: BinaryIO) -> None: + if not self._tags_parsed: + self._parse_tag(fh) + + def _parse_tag(self, fh: BinaryIO) -> None: + # http://www-mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html + # https://en.wikipedia.org/wiki/WAV + header = fh.read(12) + if header[:4] != b'RIFF' or header[8:12] != b'WAVE': + raise ParseError('Invalid WAV header') + if self._parse_duration: + self.bitdepth = 16 # assume 16bit depth (CD quality) + header_len = 8 + chunk_header = fh.read(header_len) + while len(chunk_header) == header_len: + subchunk_id = chunk_header[:4] + subchunk_size = unpack('I', chunk_header[4:])[0] + # IFF chunks are padded to an even number of bytes + subchunk_size += subchunk_size % 2 + if self._parse_duration and subchunk_id == b'fmt ': + chunk = fh.read(subchunk_size) + _format_tag, channels, samplerate = unpack(' None: + if not self._tags_parsed: + self._parse_tag(fh) + + def _parse_tag(self, fh: BinaryIO) -> None: + id3 = None + header = fh.read(4) + if header.startswith(b'ID3'): # parse ID3 header if it exists + fh.seek(-4, SEEK_CUR) + # pylint: disable=protected-access + id3 = _ID3() + id3._parse_tags = self._parse_tags + id3._load_image = self._load_image + id3._parse_id3v2(fh) + header = fh.read(4) # after ID3 should be fLaC + if header[:4] != b'fLaC': + raise ParseError('Invalid FLAC header') + # for spec, see https://xiph.org/flac/ogg_mapping.html + header_len = 4 + block_header = fh.read(header_len) + while len(block_header) == header_len: + block_type = block_header[0] & 0x7f + is_last_block = block_header[0] & 0x80 + size = unpack('>I', b'\x00' + block_header[1:])[0] + # http://xiph.org/flac/format.html#metadata_block_streaminfo + if self._parse_duration and block_type == self._STREAMINFO: + head = fh.read(size) + if len(head) < 34: # invalid streaminfo + break + # From the xiph documentation: + # py | + # ---------------------------------------------- + # H | <16> The minimum block size (in samples) + # H | <16> The maximum block size (in samples) + # 3s | <24> The minimum frame size (in bytes) + # 3s | <24> The maximum frame size (in bytes) + # 8B | <20> Sample rate in Hz. + # | <3> (number of channels)-1. + # | <5> (bits per sample)-1. + # | <36> Total samples in stream. + # 16s| <128> MD5 signature + # channels--. bits total samples + # |----- samplerate -----| |-||----| |---------~ ~----| + # 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 + # #---4---# #---5---# #---6---# #---7---# #--8-~ ~-12-# + sr = unpack('>I', b'\x00' + head[10:13])[0] >> 4 + self.channels = ((head[12] >> 1) & 0x07) + 1 + self.bitdepth = ( + ((head[12] & 1) << 4) + ((head[13] & 0xF0) >> 4) + 1) + tot_samples_b = bytes([head[13] & 0x0F]) + head[14:18] + tot_samples = unpack('>Q', b'\x00\x00\x00' + tot_samples_b)[0] + self.duration = duration = tot_samples / sr + self.samplerate = sr + if duration > 0: + self.bitrate = self.filesize * 8 / duration / 1000 + elif self._parse_tags and block_type == self._VORBIS_COMMENT: + # pylint: disable=protected-access + walker = BytesIO(fh.read(size)) + oggtag = _Ogg() + oggtag._parse_vorbis_comment(walker) + self._update(oggtag) + elif self._load_image and block_type == self._PICTURE: + fieldname, value = self._parse_image(fh) + # pylint: disable=protected-access + self.images._set_field(fieldname, value) + else: + fh.seek(size, SEEK_CUR) # seek over this block + if is_last_block: + break + block_header = fh.read(header_len) + if id3 is not None: # apply ID3 tags after vorbis + self._update(id3) + self._tags_parsed = True + + @classmethod + def _parse_image(cls, fh: BinaryIO) -> tuple[str, Image]: + # https://xiph.org/flac/format.html#metadata_block_picture + pic_type, mime_type_len = unpack('>II', fh.read(8)) + mime_type = fh.read(mime_type_len).decode('utf-8', 'replace') + description_len = unpack('>I', fh.read(4))[0] + description = fh.read(description_len).decode('utf-8', 'replace') + fh.seek(16, SEEK_CUR) # jump over width, height, depth, colors + pic_len = unpack('>I', fh.read(4))[0] + # pylint: disable=protected-access + return _ID3._create_tag_image( + fh.read(pic_len), pic_type, mime_type, description) + + +class _Wma(TinyTag): + """WMA Parser. + + http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx + http://uguisu.skr.jp/Windows/format_asf.html + """ + + _ASF_MAPPING = { + 'WM/ARTISTS': 'artist', + 'WM/TrackNumber': 'track', + 'WM/PartOfSet': 'disc', + 'WM/Year': 'year', + 'WM/AlbumArtist': 'albumartist', + 'WM/Genre': 'genre', + 'WM/AlbumTitle': 'album', + 'WM/Composer': 'composer', + 'WM/Publisher': 'other.publisher', + 'WM/BeatsPerMinute': 'other.bpm', + 'WM/InitialKey': 'other.initial_key', + 'WM/Lyrics': 'other.lyrics', + 'WM/Language': 'other.language', + 'WM/Director': 'other.director', + 'WM/AuthorURL': 'other.url', + 'WM/ISRC': 'other.isrc', + 'WM/Conductor': 'other.conductor', + 'WM/Writer': 'other.lyricist', + 'WM/SetSubTitle': 'other.set_subtitle', + 'WM/EncodedBy': 'other.encoded_by', + 'WM/EncodingSettings': 'other.encoder_settings', + 'WM/Media': 'other.media', + 'WM/Barcode': 'other.barcode', + 'WM/CatalogNo': 'other.catalog_number', + 'WM/ContentGroupDescription': 'other.grouping', + 'WM/Work': 'other.work' + } + _UNPACK_FORMATS = { + 1: ' None: + if not self._tags_parsed: + self._parse_tag(fh) + + def _parse_tag(self, fh: BinaryIO) -> None: + # http://www.garykessler.net/library/file_sigs.html + # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc521913958 + header = fh.read(30) + if (header[:16] != b'0&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel' + or header[-1:] != b'\x02'): + raise ParseError('Invalid WMA header') + header_len = 24 + object_header = fh.read(header_len) + while len(object_header) == header_len: + object_size = unpack(' self.filesize: + break # invalid object, stop parsing. + object_id = object_header[:16] + if self._parse_tags and object_id == self._ASF_CONTENT_DESC: + walker = BytesIO(fh.read(object_size - header_len)) + (title_length, author_length, + copyright_length, description_length, + rating_length) = unpack('<5H', walker.read(10)) + data_blocks = { + 'title': title_length, + 'artist': author_length, + 'other.copyright': copyright_length, + 'comment': description_length, + '_rating': rating_length, + } + for i_field_name, length in data_blocks.items(): + value = self._unpad( + walker.read(length).decode('utf-16', 'replace')) + if not i_field_name.startswith('_') and value: + self._set_field(i_field_name, value) + elif self._parse_tags and object_id == self._ASF_EXT_CONTENT_DESC: + # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc509555195 + walker = BytesIO(fh.read(object_size - header_len)) + descriptor_count = unpack(' None: + header = fh.read(12) + if header[:4] != b'FORM' or header[8:12] not in {b'AIFC', b'AIFF'}: + raise ParseError('Invalid AIFF header') + header_len = 8 + chunk_header = fh.read(header_len) + while len(chunk_header) == header_len: + subchunk_id = chunk_header[:4] + subchunk_size = unpack('>I', chunk_header[4:])[0] + # IFF chunks are padded to an even number of bytes + subchunk_size += subchunk_size % 2 + if self._parse_tags and subchunk_id in self._AIFF_MAPPING: + value = self._unpad( + fh.read(subchunk_size).decode('utf-8', 'replace')) + self._set_field(self._AIFF_MAPPING[subchunk_id], value) + elif self._parse_duration and subchunk_id == b'COMM': + chunk = fh.read(subchunk_size) + channels, num_frames, bitdepth = unpack('>hLh', chunk[:8]) + self.channels, self.bitdepth = channels, bitdepth + try: + # Extended precision + exp, mantissa = unpack('>HQ', chunk[8:18]) + sr = int(mantissa * (2 ** (exp - 0x3FFF - 63))) + duration = num_frames / sr + bitrate = sr * channels * bitdepth / 1000 + self.samplerate, self.duration, self.bitrate = ( + sr, duration, bitrate) + except OverflowError: + pass + elif self._parse_tags and subchunk_id in {b'id3 ', b'ID3 '}: + # pylint: disable=protected-access + id3 = _ID3() + id3._filehandler = fh + id3._load(tags=True, duration=False, image=self._load_image) + self._update(id3) + else: # some other chunk, just skip the data + fh.seek(subchunk_size, SEEK_CUR) + chunk_header = fh.read(header_len) + self._tags_parsed = True + + def _determine_duration(self, fh: BinaryIO) -> None: + if not self._tags_parsed: + self._parse_tag(fh) \ No newline at end of file