0
1
mirror of https://github.com/radio95-rnt/RadioPlayer.git synced 2026-02-26 13:52:00 +01:00
Files
RadioPlayer/tinytag.py
2025-12-27 21:45:18 +01:00

2052 lines
87 KiB
Python

# SPDX-FileCopyrightText: 2014-2025 tinytag Contributors
# SPDX-License-Identifier: MIT
# tinytag - an audio file metadata reader
# http://github.com/tinytag/tinytag
# MIT License
# Copyright (c) 2014-2025 Tom Wallroth, Mat (mathiascode), et al.
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
"""Audio file metadata reader."""
from __future__ import annotations
from binascii import a2b_base64
from io import BytesIO
from os import PathLike, SEEK_CUR, SEEK_END, environ, fsdecode
from struct import unpack
TYPE_CHECKING = False
# Lazy imports for type checking
if TYPE_CHECKING:
from collections.abc import Callable, Iterator # pylint: disable-all
from typing import Any, BinaryIO, Dict, List, Union
_StringListDict = Dict[str, List[str]]
_ImageListDict = Dict[str, List["Image"]]
_DataTreeDict = Dict[
bytes, Union['_DataTreeDict', Callable[..., Dict[str, Any]]]]
else:
_StringListDict = _ImageListDict = _DataTreeDict = dict
# some of the parsers can print debug info
_DEBUG = bool(environ.get('TINYTAG_DEBUG'))
class TinyTagException(Exception):
"""Base class for exceptions."""
class ParseError(TinyTagException):
"""Parsing an audio file failed."""
class UnsupportedFormatError(TinyTagException):
"""File format is not supported."""
class TinyTag:
"""A class containing audio file properties and metadata fields."""
SUPPORTED_FILE_EXTENSIONS = (
'.mp1', '.mp2', '.mp3',
'.oga', '.ogg', '.opus', '.spx',
'.wav', '.flac', '.wma',
'.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc',
'.aiff', '.aifc', '.aif', '.afc'
)
_OTHER_PREFIX = 'other.'
_file_extension_mapping: dict[tuple[str, ...], type[TinyTag]] | None = None
def __init__(self) -> None:
self.filename: str | None = None
self.filesize = 0
self.duration: float | None = None
self.channels: int | None = None
self.bitrate: float | None = None
self.bitdepth: int | None = None
self.samplerate: int | None = None
self.artist: str | None = None
self.albumartist: str | None = None
self.composer: str | None = None
self.album: str | None = None
self.disc: int | None = None
self.disc_total: int | None = None
self.title: str | None = None
self.track: int | None = None
self.track_total: int | None = None
self.genre: str | None = None
self.year: str | None = None
self.comment: str | None = None
self.images = Images()
self.other: _StringListDict = OtherFields()
self._filehandler: BinaryIO | None = None
self._default_encoding: str | None = None # override for some formats
self._parse_duration = True
self._parse_tags = True
self._load_image = False
self._tags_parsed = False
self.__dict__: dict[str, str | float | Images | OtherFields | None]
@classmethod
def get(cls,
filename: bytes | str | PathLike[Any] | None = None,
file_obj: BinaryIO | None = None,
tags: bool = True,
duration: bool = True,
image: bool = False,
encoding: str | None = None,
ignore_errors: bool | None = None) -> TinyTag:
"""Return a tag object for an audio file."""
should_close_file = file_obj is None
filename_str = None
if filename:
if should_close_file:
# pylint: disable=consider-using-with
file_obj = open(filename, 'rb')
filename_str = fsdecode(filename)
if file_obj is None:
raise ValueError(
'Either filename or file_obj argument is required')
if ignore_errors is not None:
# pylint: disable=import-outside-toplevel
from warnings import warn
warn('ignore_errors argument is obsolete, and will be removed in '
'the future', DeprecationWarning, stacklevel=2)
try:
# pylint: disable=protected-access
file_obj.seek(0, SEEK_END)
filesize = file_obj.tell()
file_obj.seek(0)
parser_class = cls._get_parser_class(filename_str, file_obj)
tag = parser_class()
tag._filehandler = file_obj
tag._default_encoding = encoding
tag.filename = filename_str
tag.filesize = filesize
if filesize > 0:
try:
tag._load(tags=tags, duration=duration, image=image)
except Exception as exc:
raise ParseError(exc) from exc
return tag
finally:
if should_close_file:
file_obj.close()
@classmethod
def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool:
"""Check if a specific file is supported based on its file
extension."""
filename_str = fsdecode(filename)
return cls._get_parser_for_filename(filename_str) is not None
def as_dict(self) -> dict[str, str | float | list[str]]:
"""Return a flat dictionary representation of available
metadata."""
fields: dict[str, str | float | list[str]] = {}
for key, value in self.__dict__.items():
if key.startswith('_'):
continue
if isinstance(value, Images):
continue
if not isinstance(value, OtherFields):
if value is None:
continue
if key != 'filename' and isinstance(value, str):
fields[key] = [value]
else:
fields[key] = value
continue
for other_key, other_values in value.items():
other_fields = fields.get(other_key)
if not isinstance(other_fields, list):
other_fields = fields[other_key] = []
other_fields += other_values
return fields
@classmethod
def _get_parser_for_filename(cls, filename: str) -> type[TinyTag] | None:
if cls._file_extension_mapping is None:
cls._file_extension_mapping = {
('.mp1', '.mp2', '.mp3'): _ID3,
('.oga', '.ogg', '.opus', '.spx'): _Ogg,
('.wav',): _Wave,
('.flac',): _Flac,
('.wma',): _Wma,
('.m4b', '.m4a', '.m4r', '.m4v', '.mp4',
'.aax', '.aaxc'): _MP4,
('.aiff', '.aifc', '.aif', '.afc'): _Aiff,
}
filename = filename.lower()
for ext, tagclass in cls._file_extension_mapping.items():
if filename.endswith(ext):
return tagclass
return None
@classmethod
def _get_parser_for_file_handle(
cls,
filehandle: BinaryIO
) -> type[TinyTag] | None:
# https://en.wikipedia.org/wiki/List_of_file_signatures
header = filehandle.read(35)
filehandle.seek(0)
if header.startswith(b'ID3') or header.startswith(b'\xff\xfb'):
return _ID3
if header.startswith(b'fLaC'):
return _Flac
if ((header[4:8] == b'ftyp'
and header[8:11] in {b'M4A', b'M4B', b'aax'})
or b'\xff\xf1' in header):
return _MP4
if (header.startswith(b'OggS')
and (header[29:33] == b'FLAC' or header[29:35] == b'vorbis'
or header[28:32] == b'Opus' or header[29:34] == b'Speex')):
return _Ogg
if header.startswith(b'RIFF') and header[8:12] == b'WAVE':
return _Wave
if header.startswith(b'\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00'
b'\xAA\x00\x62\xCE\x6C'):
return _Wma
if header.startswith(b'FORM') and header[8:12] in {b'AIFF', b'AIFC'}:
return _Aiff
return None
@classmethod
def _get_parser_class(
cls,
filename: str | None = None,
filehandle: BinaryIO | None = None
) -> type[TinyTag]:
if cls != TinyTag:
return cls
if filename:
parser_class = cls._get_parser_for_filename(filename)
if parser_class is not None:
return parser_class
# try determining the file type by magic byte header
if filehandle:
parser_class = cls._get_parser_for_file_handle(filehandle)
if parser_class is not None:
return parser_class
raise UnsupportedFormatError(
'No tag reader found to support file type')
def _load(self, tags: bool, duration: bool, image: bool = False) -> None:
self._parse_tags = tags
self._parse_duration = duration
self._load_image = image
if self._filehandler is None:
raise ValueError("File handle is required")
if tags:
self._parse_tag(self._filehandler)
if duration:
if tags: # rewind file if the tags were already parsed
self._filehandler.seek(0)
self._determine_duration(self._filehandler)
def _set_field(self, fieldname: str, value: str | float,
check_conflict: bool = True) -> None:
if fieldname.startswith(self._OTHER_PREFIX):
fieldname = fieldname[len(self._OTHER_PREFIX):]
if check_conflict and fieldname in self.__dict__:
fieldname = '_' + fieldname
if fieldname not in self.other:
self.other[fieldname] = []
self.other[fieldname].append(str(value))
if _DEBUG:
print(f'Adding value "{value} to field "{fieldname}"')
return
old_value = self.__dict__.get(fieldname)
new_value = value
if isinstance(new_value, str):
# First value goes in tag, others in tag.other
values = new_value.split('\x00')
for index, i_value in enumerate(values):
if index or old_value and i_value != old_value:
self._set_field(
self._OTHER_PREFIX + fieldname, i_value,
check_conflict=False)
continue
new_value = i_value
if old_value:
return
elif not new_value and old_value:
# Prioritize non-zero integer values
return
if _DEBUG:
print(f'Setting field "{fieldname}" to "{new_value!r}"')
self.__dict__[fieldname] = new_value
def _determine_duration(self, fh: BinaryIO) -> None:
raise NotImplementedError
def _parse_tag(self, fh: BinaryIO) -> None:
raise NotImplementedError
def _update(self, other: TinyTag) -> None:
# update the values of this tag with the values from another tag
for key, value in other.__dict__.items():
if key.startswith('_'):
continue
if isinstance(value, OtherFields):
for other_key, other_values in other.other.items():
for other_value in other_values:
self._set_field(
self._OTHER_PREFIX + other_key, other_value,
check_conflict=False)
elif isinstance(value, Images):
self.images._update(value) # pylint: disable=protected-access
elif value is not None:
self._set_field(key, value)
@staticmethod
def _unpad(s: str) -> str:
# certain strings *may* be terminated with a zero byte at the end
return s.strip('\x00')
def get_image(self) -> bytes | None:
"""Deprecated, use 'images.any' instead."""
from warnings import warn # pylint: disable=import-outside-toplevel
warn('get_image() is deprecated, and will be removed in the future. '
"Use 'images.any' instead.",
DeprecationWarning, stacklevel=2)
image = self.images.any
return image.data if image is not None else None
@property
def audio_offset(self) -> None: # pylint: disable=useless-return
"""Obsolete."""
from warnings import warn # pylint: disable=import-outside-toplevel
warn("'audio_offset' attribute is obsolete, and will be "
'removed in the future',
DeprecationWarning, stacklevel=2)
return None
@property
def extra(self) -> dict[str, str]:
"""Deprecated, use 'other' instead."""
from warnings import warn # pylint: disable=import-outside-toplevel
warn("'extra' attribute is deprecated, and will be "
"removed in the future. Use 'other' instead.",
DeprecationWarning, stacklevel=2)
extra_keys = {'copyright', 'initial_key', 'isrc', 'lyrics', 'url'}
return {k: v[0] for k, v in self.other.items() if k in extra_keys}
class Images:
"""A class containing images embedded in an audio file."""
_OTHER_PREFIX = 'other.'
def __init__(self) -> None:
self.front_cover: Image | None = None
self.back_cover: Image | None = None
self.media: Image | None = None
self.other: _ImageListDict = OtherImages()
self.__dict__: dict[str, Image | OtherImages | None]
@property
def any(self) -> Image | None:
"""Return a cover image.
If not present, fall back to any other available image.
"""
for value in self.__dict__.values():
if isinstance(value, OtherImages):
for other_images in value.values():
for image in other_images:
return image
continue
if value is not None:
return value
return None
def as_dict(self) -> dict[str, list[Image]]:
"""Return a flat dictionary representation of available images."""
images: dict[str, list[Image]] = {}
for key, value in self.__dict__.items():
if not isinstance(value, OtherImages):
if value is not None:
images[key] = [value]
continue
for other_key, other_values in value.items():
other_images = images.get(other_key)
if not isinstance(other_images, list):
other_images = images[other_key] = []
other_images += other_values
return images
def _set_field(self, fieldname: str, value: Image) -> None:
old_value = self.__dict__.get(fieldname)
if fieldname.startswith(self._OTHER_PREFIX) or old_value is not None:
fieldname = fieldname[len(self._OTHER_PREFIX):]
other_values = self.other.get(fieldname, [])
other_values.append(value)
if _DEBUG:
print(f'Setting other image field "{fieldname}"')
self.other[fieldname] = other_values
return
if _DEBUG:
print(f'Setting image field "{fieldname}"')
self.__dict__[fieldname] = value
def _update(self, other: Images) -> None:
for key, value in other.__dict__.items():
if isinstance(value, OtherImages):
for other_key, other_values in value.items():
for image_other in other_values:
self._set_field(
self._OTHER_PREFIX + other_key, image_other)
continue
if value is not None:
self._set_field(key, value)
class Image:
"""A class representing an image embedded in an audio file."""
def __init__(self,
name: str,
data: bytes,
mime_type: str | None = None) -> None:
self.name = name
self.data = data
self.mime_type = mime_type
self.description: str | None = None
def __repr__(self) -> str:
variables = vars(self).copy()
data = variables.get("data")
if data is not None:
variables["data"] = (data[:45] + b'..') if len(data) > 45 else data
data_str = ', '.join(f'{k}={v!r}' for k, v in variables.items())
return f'{type(self).__name__}({data_str})'
class OtherFields(_StringListDict):
"""A dictionary containing additional metadata fields of an audio file."""
class OtherImages(_ImageListDict):
"""A dictionary containing additional images embedded in an audio file."""
class _MP4(TinyTag):
"""MP4 Audio Parser.
https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html
https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap2/qtff2.html
"""
_CUSTOM_FIELD_NAME_MAPPING = {
'artists': 'artist',
'conductor': 'other.conductor',
'discsubtitle': 'other.set_subtitle',
'initialkey': 'other.initial_key',
'isrc': 'other.isrc',
'language': 'other.language',
'lyricist': 'other.lyricist',
'media': 'other.media',
'website': 'other.url',
'license': 'other.license',
'barcode': 'other.barcode',
'catalognumber': 'other.catalog_number',
}
_IMAGE_MIME_TYPES = {
13: 'image/jpeg',
14: 'image/png'
}
_UNPACK_FORMATS = {
1: '>b',
2: '>h',
4: '>i',
8: '>q'
}
_VERSIONED_ATOMS = {b'meta', b'stsd'} # those have an extra 4 byte header
_FLAGGED_ATOMS = {b'stsd'} # these also have an extra 4 byte header
_ILST_PATH = [b'ftyp', b'moov', b'udta', b'meta', b'ilst']
_audio_data_tree: _DataTreeDict | None = None
_meta_data_tree: _DataTreeDict | None = None
def _determine_duration(self, fh: BinaryIO) -> None:
# https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap3/qtff3.html
if _MP4._audio_data_tree is None:
_MP4._audio_data_tree = {
b'moov': {
b'mvhd': _MP4._parse_mvhd,
b'trak': {b'mdia': {b"minf": {b"stbl": {b"stsd": {
b'mp4a': _MP4._parse_audio_sample_entry_mp4a,
b'alac': _MP4._parse_audio_sample_entry_alac
}}}}}
}
}
self._traverse_atoms(fh, path=_MP4._audio_data_tree)
def _parse_tag(self, fh: BinaryIO) -> None:
# The parser tree: Each key is an atom name which is traversed if
# existing. Leaves of the parser tree are callables which receive
# the atom data. Callables return {fieldname: value} which is updates
# the TinyTag.
if _MP4._meta_data_tree is None:
_MP4._meta_data_tree = {b'moov': {b'udta': {b'meta': {b'ilst': {
# http://atomicparsley.sourceforge.net/mpeg-4files.html
# https://metacpan.org/dist/Image-ExifTool/source/lib/Image/ExifTool/QuickTime.pm#L3093
b'\xa9ART': {b'data': _MP4._data_parser('artist')},
b'\xa9alb': {b'data': _MP4._data_parser('album')},
b'\xa9cmt': {b'data': _MP4._data_parser('comment')},
b'\xa9com': {b'data': _MP4._data_parser('composer')},
b'\xa9con': {b'data': _MP4._data_parser('other.conductor')},
b'\xa9day': {b'data': _MP4._data_parser('year')},
b'\xa9des': {b'data': _MP4._data_parser('other.description')},
b'\xa9dir': {b'data': _MP4._data_parser('other.director')},
b'\xa9gen': {b'data': _MP4._data_parser('genre')},
b'\xa9grp': {b'data': _MP4._data_parser('other.grouping')},
b'\xa9lyr': {b'data': _MP4._data_parser('other.lyrics')},
b'\xa9mvc': {
b'data': _MP4._data_parser('other.movement_total')
},
b'\xa9mvi': {b'data': _MP4._data_parser('other.movement')},
b'\xa9mvn': {
b'data': _MP4._data_parser('other.movement_name')
},
b'\xa9nam': {b'data': _MP4._data_parser('title')},
b'\xa9pub': {b'data': _MP4._data_parser('other.publisher')},
b'\xa9too': {b'data': _MP4._data_parser('other.encoded_by')},
b'\xa9wrk': {b'data': _MP4._data_parser('other.work')},
b'\xa9wrt': {b'data': _MP4._data_parser('composer')},
b'aART': {b'data': _MP4._data_parser('albumartist')},
b'cprt': {b'data': _MP4._data_parser('other.copyright')},
b'desc': {b'data': _MP4._data_parser('other.description')},
b'disk': {b'data': _MP4._nums_parser('disc', 'disc_total')},
b'gnre': {b'data': _MP4._parse_id3v1_genre},
b'shwm': {b'data': _MP4._data_parser('other.show_movement')},
b'trkn': {b'data': _MP4._nums_parser('track', 'track_total')},
b'tmpo': {b'data': _MP4._data_parser('other.bpm')},
b'covr': {b'data': _MP4._parse_cover_image},
b'----': _MP4._parse_custom_field,
}}}}}
self._traverse_atoms(fh, path=_MP4._meta_data_tree)
def _traverse_atoms(self,
fh: BinaryIO,
path: _DataTreeDict,
stop_pos: int | None = None,
curr_path: list[bytes] | None = None) -> None:
header_len = ext_size_len = 8
atom_header = fh.read(header_len)
while len(atom_header) == header_len:
atom_size = unpack('>I', atom_header[:4])[0]
atom_type = atom_header[4:]
if curr_path is None: # keep track how we traversed in the tree
curr_path = [atom_type]
if atom_size == 1: # 64-bit size
ext_size_header = fh.read(ext_size_len)
if len(ext_size_header) == ext_size_len:
atom_size = unpack('>Q', ext_size_header)[0] - ext_size_len
atom_size -= header_len
if atom_size <= 0: # empty atom, jump to next one
atom_header = fh.read(header_len)
continue
if _DEBUG:
print(f'{" " * 4 * len(curr_path)} '
f'pos: {fh.tell() - header_len} '
f'atom: {atom_type!r} len: {atom_size + header_len}')
if atom_type in self._VERSIONED_ATOMS: # jump atom version for now
fh.seek(4, SEEK_CUR)
atom_size -= 4
if atom_type in self._FLAGGED_ATOMS: # jump atom flags for now
fh.seek(4, SEEK_CUR)
atom_size -= 4
sub_path = path.get(atom_type, None)
# if the path leaf is a dict, traverse deeper into the tree:
if isinstance(sub_path, dict):
atom_end_pos = fh.tell() + atom_size
self._traverse_atoms(fh, path=sub_path, stop_pos=atom_end_pos,
curr_path=curr_path + [atom_type])
# if the path-leaf is a callable, call it on the atom data
elif callable(sub_path):
for fieldname, value in sub_path(fh.read(atom_size)).items():
if _DEBUG:
print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname)
if isinstance(value, Image):
if self._load_image:
# pylint: disable=protected-access
self.images._set_field(
fieldname[len('images.'):], value)
elif isinstance(value, list):
for subval in value:
self._set_field(fieldname, subval)
else:
self._set_field(fieldname, value)
# unknown data atom, try to parse it
elif curr_path == self._ILST_PATH:
atom_end_pos = fh.tell() + atom_size
field_name = (
self._OTHER_PREFIX + atom_type.decode('latin-1').lower()
)
fh.seek(-header_len, SEEK_CUR)
self._traverse_atoms(
fh,
path={atom_type: {b'data': self._data_parser(field_name)}},
stop_pos=atom_end_pos, curr_path=curr_path + [atom_type])
# if no action was specified using dict or callable, jump over atom
else:
fh.seek(atom_size, SEEK_CUR)
# check if we have reached the end of this branch:
if stop_pos and fh.tell() >= stop_pos:
return # return to parent (next parent node in tree)
atom_header = fh.read(header_len) # read next atom
@classmethod
def _data_parser(cls, fieldname: str) -> Callable[[bytes], dict[str, str]]:
def _parse_data_atom(data_atom: bytes) -> dict[str, str]:
data_type = unpack('>I', data_atom[:4])[0]
data = data_atom[8:]
value = None
if data_type == 1: # UTF-8 string
value = data.decode('utf-8', 'replace')
elif data_type == 21: # BE signed integer
fmts = cls._UNPACK_FORMATS
data_len = len(data)
if data_len in fmts:
value = str(unpack(fmts[data_len], data)[0])
if value:
return {fieldname: value}
return {}
return _parse_data_atom
@classmethod
def _nums_parser(
cls, fieldname1: str, fieldname2: str
) -> Callable[[bytes], dict[str, int]]:
def _parse_nums(data_atom: bytes) -> dict[str, int]:
number_data = data_atom[8:14]
numbers = unpack('>3H', number_data)
# for some reason the first number is always irrelevant.
return {fieldname1: numbers[1], fieldname2: numbers[2]}
return _parse_nums
@classmethod
def _parse_id3v1_genre(cls, data_atom: bytes) -> dict[str, str]:
# dunno why genre is offset by -1 but that's how mutagen does it
idx = unpack('>H', data_atom[8:])[0] - 1
result = {}
# pylint: disable=protected-access
if idx < len(_ID3._ID3V1_GENRES):
result['genre'] = _ID3._ID3V1_GENRES[idx]
return result
@classmethod
def _parse_cover_image(cls, data_atom: bytes) -> dict[str, Image]:
data_type = unpack('>I', data_atom[:4])[0]
image = Image(
'front_cover', data_atom[8:], cls._IMAGE_MIME_TYPES.get(data_type))
return {'images.front_cover': image}
@classmethod
def _read_extended_descriptor(cls, esds_atom: BinaryIO) -> None:
for _i in range(4):
if esds_atom.read(1) != b'\x80':
break
@classmethod
def _parse_custom_field(cls, data: bytes) -> dict[str, list[str]]:
fh = BytesIO(data)
header_len = 8
field_name = None
values = []
atom_header = fh.read(header_len)
while len(atom_header) == header_len:
atom_size = unpack('>I', atom_header[:4])[0] - header_len
atom_type = atom_header[4:]
if atom_type == b'name':
atom_value = fh.read(atom_size)[4:].lower()
field_name = atom_value.decode('utf-8', 'replace')
# pylint: disable=protected-access
field_name = cls._CUSTOM_FIELD_NAME_MAPPING.get(
field_name, TinyTag._OTHER_PREFIX + field_name)
elif atom_type == b'data' and field_name:
data_atom = fh.read(atom_size)
parser = cls._data_parser(field_name)
atom_values = parser(data_atom)
if field_name in atom_values:
values.append(atom_values[field_name])
else:
fh.seek(atom_size, SEEK_CUR)
atom_header = fh.read(header_len) # read next atom
if field_name and values:
return {field_name: values}
return {}
@classmethod
def _parse_audio_sample_entry_mp4a(cls, data: bytes) -> dict[str, int]:
# this atom also contains the esds atom:
# https://ffmpeg.org/doxygen/0.6/mov_8c-source.html
# http://xhelmboyx.tripod.com/formats/mp4-layout.txt
# http://sasperger.tistory.com/103
# jump over version and flags
channels = unpack('>H', data[16:18])[0]
# jump over bit_depth, QT compr id & pkt size
sr = unpack('>I', data[22:26])[0]
# ES Description Atom
esds_atom_size = unpack('>I', data[28:32])[0]
esds_atom = BytesIO(data[36:36 + esds_atom_size])
esds_atom.seek(5, SEEK_CUR) # jump over version, flags and tag
# ES Descriptor
cls._read_extended_descriptor(esds_atom)
esds_atom.seek(4, SEEK_CUR) # jump over ES id, flags and tag
# Decoder Config Descriptor
cls._read_extended_descriptor(esds_atom)
esds_atom.seek(9, SEEK_CUR)
avg_br = unpack('>I', esds_atom.read(4))[0] / 1000 # kbit/s
return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br}
@classmethod
def _parse_audio_sample_entry_alac(cls, data: bytes) -> dict[str, int]:
# https://github.com/macosforge/alac/blob/master/ALACMagicCookieDescription.txt
bitdepth = data[45]
channels = data[49]
avg_br, sr = unpack('>II', data[56:64])
avg_br /= 1000 # kbit/s
return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br,
'bitdepth': bitdepth}
@classmethod
def _parse_mvhd(cls, data: bytes) -> dict[str, float]:
# http://stackoverflow.com/a/3639993/1191373
version = data[0]
# jump over flags, create & mod times
if version == 0: # uses 32 bit integers for timestamps
time_scale, duration = unpack('>II', data[12:20])
else: # version == 1: # uses 64-bit integers for timestamps
time_scale, duration = unpack('>IQ', data[20:32])
return {'duration': duration / time_scale}
class _ID3(TinyTag):
"""MP3 Parser."""
_ID3_MAPPING = {
# Mapping from Frame ID to a field of the TinyTag
# https://exiftool.org/TagNames/ID3.html
b'COMM': 'comment', b'COM': 'comment',
b'TRCK': 'track', b'TRK': 'track',
b'TYER': 'year', b'TYE': 'year', b'TDRC': 'year',
b'TALB': 'album', b'TAL': 'album',
b'TPE1': 'artist', b'TP1': 'artist',
b'TIT2': 'title', b'TT2': 'title',
b'TCON': 'genre', b'TCO': 'genre',
b'TPOS': 'disc', b'TPA': 'disc',
b'TPE2': 'albumartist', b'TP2': 'albumartist',
b'TCOM': 'composer', b'TCM': 'composer',
b'WOAR': 'other.url', b'WAR': 'other.url',
b'TSRC': 'other.isrc', b'TRC': 'other.isrc',
b'TCOP': 'other.copyright', b'TCR': 'other.copyright',
b'TBPM': 'other.bpm', b'TBP': 'other.bpm',
b'TKEY': 'other.initial_key', b'TKE': 'other.initial_key',
b'TLAN': 'other.language', b'TLA': 'other.language',
b'TPUB': 'other.publisher', b'TPB': 'other.publisher',
b'USLT': 'other.lyrics', b'ULT': 'other.lyrics',
b'TPE3': 'other.conductor', b'TP3': 'other.conductor',
b'TEXT': 'other.lyricist', b'TXT': 'other.lyricist',
b'TSST': 'other.set_subtitle',
b'TENC': 'other.encoded_by', b'TEN': 'other.encoded_by',
b'TSSE': 'other.encoder_settings', b'TSS': 'other.encoder_settings',
b'TMED': 'other.media', b'TMT': 'other.media',
b'WCOP': 'other.license',
b'MVNM': 'other.movement_name',
b'MVIN': 'other.movement',
b'GRP1': 'modern_grouping', b'GP1': 'modern_grouping',
b'TIT1': 'legacy_grouping', b'TT1': 'legacy_grouping',
}
_ID3_MAPPING_CUSTOM = {
'artists': 'artist',
'director': 'other.director',
'license': 'other.license',
'barcode': 'other.barcode',
'catalognumber': 'other.catalog_number',
'showmovement': 'other.show_movement'
}
_EMPTY_FRAME_IDS = {b'\x00\x00\x00\x00', b'\x00\x00\x00'}
_IMAGE_FRAME_IDS = {b'APIC', b'PIC'}
_CUSTOM_FRAME_IDS = {b'TXXX', b'TXX'}
_SYNCED_LYRICS_FRAME_IDS = {b'SYLT', b'SLT'}
_IGNORED_FRAME_IDS = {
b'AENC', b'CRA',
b'APIC', b'PIC',
b'ASPI',
b'ATXT',
b'CHAP',
b'COMR',
b'CRM',
b'CTOC',
b'ENCR',
b'EQU2', b'EQU',
b'ETCO', b'ETC',
b'GEOB', b'GEO',
b'GRID',
b'LINK', b'LNK',
b'MCDI', b'MCI',
b'MLLT', b'MLL',
b'PCNT', b'CNT',
b'POPM', b'POP',
b'POSS',
b'PRIV',
b'RBUF', b'BUF',
b'RGAD',
b'RVA2', b'RVA',
b'RVRB', b'REV',
b'SEEK',
b'SIGN',
b'SYTC', b'STC',
}
_ID3V1_TAG_SIZE = 128
_MAX_ESTIMATION_SEC = 30.0
_CBR_DETECTION_FRAME_COUNT = 5
_USE_XING_HEADER = True # much faster, but can be deactivated for testing
_ID3V1_GENRES = (
'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco',
'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies',
'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial',
'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack',
'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion',
'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game',
'Sound Clip', 'Gospel', 'Noise', 'AlternRock', 'Bass', 'Soul', 'Punk',
'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock',
'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic',
'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult',
'Gangsta', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle',
'Native American', 'Cabaret', 'New Wave', 'Psychadelic', 'Rave',
'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz',
'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock',
# Wimamp Extended Genres
'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast Fusion', 'Bebob',
'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock',
'Progressive Rock', 'Psychedelic Rock', 'Symphonic Rock', 'Slow Rock',
'Big Band', 'Chorus', 'Easy listening', 'Acoustic', 'Humour', 'Speech',
'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony',
'Booty Bass', 'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club',
'Tango', 'Samba', 'Folklore', 'Ballad', 'Power Ballad',
'Rhythmic Soul', 'Freestyle', 'Duet', 'Punk Rock', 'Drum Solo',
'A capella', 'Euro-House', 'Dance Hall', 'Goa', 'Drum & Bass',
'Club-House', 'Hardcore Techno', 'Terror', 'Indie', 'BritPop',
'Afro-Punk', 'Polsk Punk', 'Beat', 'Christian Gangsta Rap',
'Heavy Metal', 'Black Metal', 'Contemporary Christian',
'Christian Rock',
# WinAmp 1.91
'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'Jpop', 'Synthpop',
# WinAmp 5.6
'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat',
'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro',
'Electroclash', 'Emo', 'Experimental', 'Garage', 'Illbient',
'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge',
'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock',
'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music',
'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle',
'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock',
'Psybient',
)
_ID3V2_2_IMAGE_FORMATS = {
b'bmp': 'image/bmp',
b'jpg': 'image/jpeg',
b'png': 'image/png',
}
_IMAGE_TYPES = (
'other.generic',
'other.icon',
'other.alt_icon',
'front_cover',
'back_cover',
'other.leaflet',
'media',
'other.lead_artist',
'other.artist',
'other.conductor',
'other.band',
'other.composer',
'other.lyricist',
'other.recording_location',
'other.during_recording',
'other.during_performance',
'other.screen_capture',
'other.bright_colored_fish',
'other.illustration',
'other.band_logo',
'other.publisher_logo',
)
_UNKNOWN_IMAGE_TYPE = 'other.unknown'
# see this page for the magic values used in mp3:
# http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm
_SAMPLE_RATES = (
(11025, 12000, 8000), # MPEG 2.5
(0, 0, 0), # reserved
(22050, 24000, 16000), # MPEG 2
(44100, 48000, 32000), # MPEG 1
)
_V1L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416,
448, 0)
_V1L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320,
384, 0)
_V1L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256,
320, 0)
_V2L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224,
256, 0)
_V2L2 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0)
_V2L3 = _V2L2
_NONE = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
_BITRATE_VERSION_LAYERS = (
# note that layers go from 3 to 1 by design, first layer id is reserved
(_NONE, _V2L3, _V2L2, _V2L1), # MPEG Version 2.5
(_NONE, _NONE, _NONE, _NONE), # reserved
(_NONE, _V2L3, _V2L2, _V2L1), # MPEG Version 2
(_NONE, _V1L3, _V1L2, _V1L1), # MPEG Version 1
)
_SAMPLES_PER_FRAME = 1152 # the default frame size for mp3
_CHANNELS_PER_CHANNEL_MODE = (
2, # 00 Stereo
2, # 01 Joint stereo (Stereo)
2, # 10 Dual channel (2 mono channels)
1, # 11 Single channel (Mono)
)
def __init__(self) -> None:
super().__init__()
# save position after the ID3 tag for duration measurement speedup
self._bytepos_after_id3v2 = -1
self._modern_grouping_values: list[str] = []
self._legacy_grouping_values: list[str] = []
@staticmethod
def _parse_xing_header(fh: BinaryIO) -> tuple[int, int]:
# see: http://www.mp3-tech.org/programmer/sources/vbrheadersdk.zip
fh.seek(4, SEEK_CUR) # read over Xing header
header_flags = unpack('>i', fh.read(4))[0]
frames = byte_count = 0
if header_flags & 1: # FRAMES FLAG
frames = unpack('>i', fh.read(4))[0]
if header_flags & 2: # BYTES FLAG
byte_count = unpack('>i', fh.read(4))[0]
if header_flags & 4: # TOC FLAG
fh.seek(100, SEEK_CUR)
if header_flags & 8: # VBR SCALE FLAG
fh.seek(4, SEEK_CUR)
return frames, byte_count
def _determine_duration(self, fh: BinaryIO) -> None:
# if tag reading was disabled, find start position of audio data
if self._bytepos_after_id3v2 == -1:
self._parse_id3v2_header(fh)
max_estimation_frames = (
(self._MAX_ESTIMATION_SEC * 44100) // self._SAMPLES_PER_FRAME)
frame_size_accu = 0
audio_offset = self._bytepos_after_id3v2
frames = 0 # count frames for determining mp3 duration
bitrate_accu = 0 # add up bitrates to find average bitrate to detect
last_bitrates = set() # CBR mp3s (multiple frames with same bitrates)
# seek to first position after id3 tag (speedup for large header)
first_mpeg_id = None
fh.seek(self._bytepos_after_id3v2)
while True:
# reading through garbage until 11 '1' sync-bits are found
header = fh.read(4)
header_len = len(header)
if header_len < 4:
if frames:
self.bitrate = bitrate_accu / frames
break # EOF
_sync, conf, bitrate_freq, rest = unpack('4B', header)
br_id = (bitrate_freq >> 4) & 0x0F # biterate id
sr_id = (bitrate_freq >> 2) & 0x03 # sample rate id
padding = 1 if bitrate_freq & 0x02 > 0 else 0
mpeg_id = (conf >> 3) & 0x03
layer_id = (conf >> 1) & 0x03
channel_mode = (rest >> 6) & 0x03
# check for eleven 1s, validate bitrate and sample rate
if (header[:2] <= b'\xFF\xE0'
or (first_mpeg_id is not None and first_mpeg_id != mpeg_id)
or br_id > 14 or br_id == 0 or sr_id == 3 or layer_id == 0
or mpeg_id == 1):
# invalid frame, find next sync header
idx = header.find(b'\xFF', 1)
next_offset = header_len
if idx != -1:
next_offset -= idx
fh.seek(idx - header_len, SEEK_CUR)
if frames == 0:
audio_offset += next_offset
continue
if first_mpeg_id is None:
first_mpeg_id = mpeg_id
self.channels = self._CHANNELS_PER_CHANNEL_MODE[channel_mode]
frame_br = self._BITRATE_VERSION_LAYERS[mpeg_id][layer_id][br_id]
self.samplerate = samplerate = self._SAMPLE_RATES[mpeg_id][sr_id]
frame_length = (144000 * frame_br) // samplerate + padding
# There might be a xing header in the first frame that contains
# all the info we need, otherwise parse multiple frames to find the
# accurate average bitrate
if frames == 0 and self._USE_XING_HEADER:
prev_offset = header_len + audio_offset
frame_content = fh.read(frame_length)
xing_header_offset = frame_content.find(b'Xing')
if xing_header_offset != -1:
fh.seek(prev_offset + xing_header_offset)
xframes, byte_count = self._parse_xing_header(fh)
if xframes > 0 and byte_count > 0:
# MPEG-2 Audio Layer III uses 576 samples per frame
samples_pf = self._SAMPLES_PER_FRAME
if mpeg_id <= 2:
samples_pf = 576
self.duration = dur = xframes * samples_pf / samplerate
self.bitrate = byte_count * 8 / dur / 1000
return
fh.seek(prev_offset)
frames += 1 # it's most probably a mp3 frame
bitrate_accu += frame_br
if frames <= self._CBR_DETECTION_FRAME_COUNT:
last_bitrates.add(frame_br)
frame_size_accu += frame_length
# if bitrate does not change over time its probably CBR
is_cbr = (frames == self._CBR_DETECTION_FRAME_COUNT
and len(last_bitrates) == 1)
if frames == max_estimation_frames or is_cbr:
# try to estimate duration
stream_size = (
self.filesize - audio_offset - self._ID3V1_TAG_SIZE)
est_frame_count = stream_size / (frame_size_accu / frames)
samples = est_frame_count * self._SAMPLES_PER_FRAME
self.duration = samples / samplerate
self.bitrate = bitrate_accu / frames
return
if frame_length > 1: # jump over current frame body
fh.seek(frame_length - header_len, SEEK_CUR)
if self.samplerate:
self.duration = frames * self._SAMPLES_PER_FRAME / self.samplerate
def _parse_tag(self, fh: BinaryIO) -> None:
self._parse_id3v2(fh)
if self.filesize >= self._ID3V1_TAG_SIZE:
# try parsing id3v1 at the end of file
fh.seek(self.filesize - self._ID3V1_TAG_SIZE)
self._parse_id3v1(fh)
def _parse_id3v2_header(self, fh: BinaryIO) -> tuple[int, bool, int]:
size = major = 0
extended = False
# for info on the specs, see: http://id3.org/Developer%20Information
header = fh.read(10)
# check if there is an ID3v2 tag at the beginning of the file
if header.startswith(b'ID3'):
major = header[3]
if _DEBUG:
print(f'Found id3 v2.{major}')
extended = (header[5] & 0x40) > 0
size = self._unsynchsafe(unpack('4B', header[6:10]))
self._bytepos_after_id3v2 = size
return size, extended, major
def _parse_id3v2(self, fh: BinaryIO) -> None:
size, extended, major = self._parse_id3v2_header(fh)
if size <= 0:
return
end_pos = fh.tell() + size
parsed_size = 0
if extended: # just read over the extended header.
extd_size = self._unsynchsafe(unpack('4B', fh.read(6)[:4]))
fh.seek(extd_size - 6, SEEK_CUR) # jump over extended_header
while parsed_size < size:
frame_size = self._parse_frame(fh, size, id3version=major)
if frame_size == -1:
break
parsed_size += frame_size
fh.seek(end_pos)
self._set_grouping_work_fields()
def _parse_id3v1(self, fh: BinaryIO) -> None:
content = fh.read(3 + 30 + 30 + 30 + 4 + 30 + 1)
if content[:3] != b'TAG': # check if this is an ID3 v1 tag
return
def asciidecode(x: bytes) -> str:
return self._unpad(
x.decode(self._default_encoding or 'latin1', 'replace'))
# Only set fields that were not set by ID3v2 tags, as ID3v1
# tags are more likely to be outdated or have encoding issues
if not self.title:
value = asciidecode(content[3:33])
if value:
self._set_field('title', value)
if not self.artist:
value = asciidecode(content[33:63])
if value:
self._set_field('artist', value)
if not self.album:
value = asciidecode(content[63:93])
if value:
self._set_field('album', value)
if not self.year:
value = asciidecode(content[93:97])
if value:
self._set_field('year', value)
comment = content[97:127]
if b'\x00\x00' < comment[-2:] < b'\x01\x00':
if self.track is None:
self._set_field('track', ord(comment[-1:]))
comment = comment[:-2]
if not self.comment:
value = asciidecode(comment)
if value:
self._set_field('comment', value)
if not self.genre:
genre_id = ord(content[127:128])
if genre_id < len(self._ID3V1_GENRES):
self._set_field('genre', self._ID3V1_GENRES[genre_id])
def _parse_custom_field(self, content: str) -> bool:
custom_field_name, separator, value = content.partition('\x00')
custom_field_name_lower = custom_field_name.lower()
value = value.lstrip('\ufeff')
if custom_field_name_lower and separator and value:
field_name = self._ID3_MAPPING_CUSTOM.get(
custom_field_name_lower,
self._OTHER_PREFIX + custom_field_name_lower)
self._set_field(field_name, value)
return True
return False
def _set_grouping_work_fields(self) -> None:
# iTunes 12.5.4.42 added a new GRP1 frame for 'grouping', and
# repurposed the TIT1 frame for 'work'. Handle this mess here.
if self._modern_grouping_values:
for value in self._modern_grouping_values:
self._set_field('other.grouping', value)
for value in self._legacy_grouping_values:
self._set_field('other.work', value)
return
for value in self._legacy_grouping_values:
self._set_field('other.grouping', value)
@classmethod
def _create_tag_image(cls,
data: bytes,
pic_type: int,
mime_type: str | None = None,
description: str | None = None) -> tuple[str, Image]:
field_name = cls._UNKNOWN_IMAGE_TYPE
if 0 <= pic_type <= len(cls._IMAGE_TYPES):
field_name = cls._IMAGE_TYPES[pic_type]
name = field_name
if field_name.startswith(cls._OTHER_PREFIX):
name = field_name[len(cls._OTHER_PREFIX):]
image = Image(name, data)
if mime_type:
image.mime_type = mime_type
if description:
image.description = description
return field_name, image
def _parse_image(self,
frame_id: bytes,
content: bytes) -> tuple[str, Image]:
# See section 4.14: http://id3.org/id3v2.4.0-frames
encoding = content[:1]
if frame_id == b'PIC': # ID3 v2.2:
imgformat = content[1:4].lower()
mime_type = self._ID3V2_2_IMAGE_FORMATS.get(imgformat)
# skip encoding (1), imgformat (3), pictype(1)
desc_start_pos = 5
else: # ID3 v2.3+
mime_start_pos = 1
mime_end_pos = self._find_string_end_pos(
content, start_pos=mime_start_pos)
mime_type = self._decode_string(
content[mime_start_pos:mime_end_pos]).lower()
# skip mtype, pictype(1)
desc_start_pos = mime_end_pos + 1
pic_type = content[desc_start_pos - 1]
desc_end_pos = self._find_string_end_pos(
content, encoding, desc_start_pos)
# skip stray null byte in broken file
if (desc_end_pos + 1 < len(content)
and content[desc_end_pos] == 0
and content[desc_end_pos + 1] != 0):
desc_end_pos += 1
desc = self._decode_string(
encoding + content[desc_start_pos:desc_end_pos])
return self._create_tag_image(
content[desc_end_pos:], pic_type, mime_type, desc)
@staticmethod
def _lrc_timestamp(seconds: float) -> str:
cs = int(seconds * 100)
minutes, cs = divmod(cs, 6000)
seconds, cs = divmod(cs, 100)
return f"{minutes:02d}:{seconds:02d}.{cs:02d}"
def _parse_synced_lyrics(self, content: bytes) -> str:
# Convert ID3 synced lyrics to LRC format
content_length = len(content)
encoding = content[:1]
# skip language (3)
timestamp_format = content[4:5]
# skip content type (1)
start_pos = 6
end_pos = self._find_string_end_pos(content, encoding, start_pos)
lyrics = ""
offset = end_pos
found_line = False
while offset < content_length:
end_pos = self._find_string_end_pos(content, encoding, offset)
value = self._decode_string(
encoding + content[offset:end_pos]).lstrip('\n')
offset = end_pos
time = unpack('>I', content[offset:offset + 4])[0]
offset += 4
if found_line:
lyrics += '\n'
found_line = True
if timestamp_format == b'\x02':
# time in milliseconds
timestamp = self._lrc_timestamp(time / 1000)
else:
lyrics += value
continue
lyrics += f'[{timestamp}]{value}'
return lyrics
def _parse_frame(self,
fh: BinaryIO,
total_size: int,
id3version: int | None = None) -> int:
# ID3v2.2 especially ugly. see: http://id3.org/id3v2-00
header_len = 6 if id3version == 2 else 10
frame_size_bytes = 3 if id3version == 2 else 4
is_synchsafe_int = id3version == 4
header = fh.read(header_len)
if len(header) != header_len:
return -1
frame_id = header[:frame_size_bytes]
if frame_id in self._EMPTY_FRAME_IDS:
return -1
frame_size: int
if frame_size_bytes == 3:
frame_size = unpack('>I', b'\x00' + header[3:6])[0]
elif is_synchsafe_int:
frame_size = self._unsynchsafe(unpack('4B', header[4:8]))
else:
frame_size = unpack('>I', header[4:8])[0]
if _DEBUG:
print(f'Found id3 Frame {frame_id!r} at '
f'{fh.tell()}-{fh.tell() + frame_size} of {self.filesize}')
if frame_size > total_size:
# invalid frame size, stop here
return -1
should_set_field = True
if self._parse_tags and frame_id in self._ID3_MAPPING:
fieldname = self._ID3_MAPPING[frame_id]
language = fieldname in {'comment', 'other.lyrics'}
value = self._decode_string(fh.read(frame_size), language)
if not value:
return frame_size
if fieldname == "comment":
# check if comment is a key-value pair (used by iTunes)
should_set_field = not self._parse_custom_field(value)
elif fieldname in {'track', 'disc', 'other.movement'}:
if '/' in value:
value, total = value.split('/')[:2]
if total.isdecimal():
self._set_field(f'{fieldname}_total', int(total))
if value.isdecimal():
self._set_field(fieldname, int(value))
should_set_field = False
elif fieldname == 'genre':
genre_id = 255
# funky: id3v1 genre hidden in a id3v2 field
if value.isdecimal():
genre_id = int(value)
# funkier: the TCO may contain genres in parens, e.g '(13)'
elif value.startswith('('):
end_pos = value.find(')')
parens_text = value[1:end_pos]
if end_pos > 0 and parens_text.isdecimal():
genre_id = int(parens_text)
if 0 <= genre_id < len(self._ID3V1_GENRES):
value = self._ID3V1_GENRES[genre_id]
elif fieldname == 'modern_grouping':
self._modern_grouping_values.append(value)
should_set_field = False
elif fieldname == 'legacy_grouping':
self._legacy_grouping_values.append(value)
should_set_field = False
if should_set_field:
self._set_field(fieldname, value)
elif self._parse_tags and frame_id in self._SYNCED_LYRICS_FRAME_IDS:
lyrics = self._parse_synced_lyrics(fh.read(frame_size))
self._set_field('other.lyrics', lyrics)
elif self._parse_tags and frame_id in self._CUSTOM_FRAME_IDS:
# custom fields
value = self._decode_string(fh.read(frame_size))
if value:
self._parse_custom_field(value)
elif self._parse_tags and frame_id not in self._IGNORED_FRAME_IDS:
# unknown, try to add to other dict
value = self._decode_string(fh.read(frame_size))
if value:
self._set_field(
self._OTHER_PREFIX + frame_id.decode('latin-1').lower(),
value)
elif self._load_image and frame_id in self._IMAGE_FRAME_IDS:
field_name, image = self._parse_image(
frame_id, fh.read(frame_size))
# pylint: disable=protected-access
self.images._set_field(field_name, image)
else: # skip frame
fh.seek(frame_size, SEEK_CUR)
return frame_size
@staticmethod
def _find_string_end_pos(content: bytes,
encoding: bytes = b'\x00',
start_pos: int = 0) -> int:
# latin1 and utf-8 are 1 byte
if encoding in {b'\x00', b'\x03'}:
return content.find(b'\x00', start_pos) + 1
end_pos = 0
for i in range(start_pos, len(content), 2):
if content[i:i + 2] == b'\x00\x00':
end_pos = i + 2
break
return end_pos
def _decode_string(self, value: bytes, language: bool = False) -> str:
default_encoding = 'ISO-8859-1'
if self._default_encoding:
default_encoding = self._default_encoding
# it's not my fault, this is the spec.
first_byte = value[:1]
if first_byte == b'\x00': # ISO-8859-1
value = value[1:]
encoding = default_encoding
elif first_byte == b'\x01': # UTF-16 with BOM
value = value[1:]
# remove language (but leave BOM)
if language:
if value[3:5] in {b'\xfe\xff', b'\xff\xfe'}:
value = value[3:]
if value[:3].isalpha():
value = value[3:] # remove language
# strip optional additional null bytes
value = value.lstrip(b'\x00')
# read byte order mark to determine endianness
encoding = ('UTF-16be' if value.startswith(b'\xfe\xff')
else 'UTF-16le')
# strip the bom if it exists
if value.startswith(b'\xfe\xff') or value.startswith(b'\xff\xfe'):
value = value[2:] if len(value) % 2 == 0 else value[2:-1]
# remove ADDITIONAL OTHER BOM :facepalm:
if value.startswith(b'\x00\x00\xff\xfe'):
value = value[4:]
elif first_byte == b'\x02': # UTF-16 without BOM
# strip optional null byte, if byte count uneven
value = value[1:-1] if len(value) % 2 == 0 else value[1:]
encoding = 'UTF-16be'
elif first_byte == b'\x03': # UTF-8
value = value[1:]
encoding = 'UTF-8'
else:
encoding = default_encoding # wild guess
if language and value[:3].isalpha():
value = value[3:] # remove language
return self._unpad(value.decode(encoding, 'replace'))
@staticmethod
def _unsynchsafe(ints: tuple[int, ...]) -> int:
return (ints[0] << 21) + (ints[1] << 14) + (ints[2] << 7) + ints[3]
class _Ogg(TinyTag):
"""OGG Parser."""
_VORBIS_MAPPING = {
'album': 'album',
'albumartist': 'albumartist',
'title': 'title',
'artist': 'artist',
'artists': 'artist',
'author': 'artist',
'date': 'year',
'tracknumber': 'track',
'tracktotal': 'track_total',
'totaltracks': 'track_total',
'discnumber': 'disc',
'disctotal': 'disc_total',
'totaldiscs': 'disc_total',
'genre': 'genre',
'description': 'comment',
'comment': 'comment',
'comments': 'comment',
'composer': 'composer',
'bpm': 'other.bpm',
'copyright': 'other.copyright',
'isrc': 'other.isrc',
'lyrics': 'other.lyrics',
'unsyncedlyrics': 'other.lyrics',
'publisher': 'other.publisher',
'language': 'other.language',
'director': 'other.director',
'website': 'other.url',
'conductor': 'other.conductor',
'lyricist': 'other.lyricist',
'discsubtitle': 'other.set_subtitle',
'setsubtitle': 'other.set_subtitle',
'initialkey': 'other.initial_key',
'key': 'other.initial_key',
'encodedby': 'other.encoded_by',
'encodersettings': 'other.encoder_settings',
'media': 'other.media',
'license': 'other.license',
'barcode': 'other.barcode',
'catalognumber': 'other.catalog_number',
'movementname': 'other.movement_name',
'movement': 'other.movement',
'movementtotal': 'other.movement_total',
'showmovement': 'other.show_movement',
'grouping': 'other.grouping',
'contentgroup': 'other.grouping',
'work': 'other.work'
}
def __init__(self) -> None:
super().__init__()
self._granule_pos = 0
self._pre_skip = 0 # number of samples to skip in opus stream
self._audio_size: int | None = None # size of opus audio stream
def _determine_duration(self, fh: BinaryIO) -> None:
if not self._tags_parsed:
self._parse_tag(fh) # determine sample rate
if self.duration is not None or not self.samplerate:
return # either ogg flac or invalid file
self.duration = max(
(self._granule_pos - self._pre_skip) / self.samplerate, 0
)
if self._audio_size is None or not self.duration:
return # not an opus file
self.bitrate = self._audio_size * 8 / self.duration / 1000
def _parse_tag(self, fh: BinaryIO) -> None:
check_flac_second_packet = False
check_speex_second_packet = False
for packet in self._parse_pages(fh):
if packet.startswith(b"\x01vorbis"):
if self._parse_duration:
self.channels, self.samplerate = unpack(
"<Bi", packet[11:16])
self.bitrate = unpack("<i", packet[20:24])[0] / 1000
elif packet.startswith(b"\x03vorbis"):
if self._parse_tags:
walker = BytesIO(packet)
walker.seek(7) # jump over header name
self._parse_vorbis_comment(walker)
elif packet.startswith(b'OpusHead'):
if self._parse_duration: # parse opus header
# https://www.videolan.org/developers/vlc/modules/codec/opus_header.c
# https://mf4.xiph.org/jenkins/view/opus/job/opusfile-unix/ws/doc/html/structOpusHead.html
version, ch, pre_skip = unpack("<BBH", packet[8:12])
if (version & 0xF0) == 0: # only major version 0 supported
self.channels = ch
self.samplerate = 48000
self._pre_skip = pre_skip
elif packet.startswith(b'OpusTags'):
if self._parse_tags: # parse opus metadata:
walker = BytesIO(packet)
walker.seek(8) # jump over header name
self._parse_vorbis_comment(walker)
self._audio_size = 0 # start counting size of audio stream
elif packet.startswith(b'\x7fFLAC'):
# https://xiph.org/flac/ogg_mapping.html
walker = BytesIO(packet)
# jump over header name, version and number of headers
walker.seek(9)
# pylint: disable=protected-access
flactag = _Flac()
flactag._filehandler = walker
flactag.filesize = self.filesize
flactag._load(
tags=self._parse_tags, duration=self._parse_duration,
image=self._load_image)
self._update(flactag)
check_flac_second_packet = True
elif check_flac_second_packet:
# second packet contains FLAC metadata block
if self._parse_tags:
walker = BytesIO(packet)
meta_header = walker.read(4)
block_type = meta_header[0] & 0x7f
# pylint: disable=protected-access
if block_type == _Flac._VORBIS_COMMENT:
self._parse_vorbis_comment(walker)
check_flac_second_packet = False
elif packet.startswith(b'Speex '):
# https://speex.org/docs/manual/speex-manual/node8.html
if self._parse_duration:
self.samplerate = unpack("<i", packet[36:40])[0]
self.channels, self.bitrate = unpack("<ii", packet[48:56])
check_speex_second_packet = True
elif check_speex_second_packet:
if self._parse_tags:
walker = BytesIO(packet)
# starts with a comment string
length = unpack('I', walker.read(4))[0]
comment = walker.read(length).decode('utf-8', 'replace')
self._set_field('comment', comment)
# other tags
self._parse_vorbis_comment(walker, has_vendor=False)
check_speex_second_packet = False
else:
# Optimization: If we need to determine the duration, read
# granule_pos of remaining pages, but skip contents of
# segments. If we don't need the duration, stop here.
self._tags_parsed = True
if not self._parse_duration:
return
self._tags_parsed = True
def _parse_vorbis_comment(self,
fh: BinaryIO,
has_vendor: bool = True) -> None:
# for the spec, see: http://xiph.org/vorbis/doc/v-comment.html
# discnumber tag based on: https://en.wikipedia.org/wiki/Vorbis_comment
# https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Vorbis.html
if has_vendor:
vendor_length = unpack('I', fh.read(4))[0]
fh.seek(vendor_length, SEEK_CUR) # jump over vendor
elements = unpack('I', fh.read(4))[0]
for _i in range(elements):
length = unpack('I', fh.read(4))[0]
keyvalpair = fh.read(length).decode('utf-8', 'replace')
if '=' in keyvalpair:
key, value = keyvalpair.split('=', 1)
key_lower = key.lower()
if key_lower == "metadata_block_picture":
if self._load_image:
if _DEBUG:
print('Found Vorbis Image', key, value[:64])
# pylint: disable=protected-access
fieldname, fieldvalue = _Flac._parse_image(
BytesIO(a2b_base64(value)))
self.images._set_field(fieldname, fieldvalue)
else:
if _DEBUG:
print('Found Vorbis Comment', key, value[:64])
fieldname = self._VORBIS_MAPPING.get(
key_lower, self._OTHER_PREFIX + key_lower)
if fieldname in {
'track', 'disc', 'track_total', 'disc_total'
}:
if fieldname in {'track', 'disc'} and '/' in value:
value, total = value.split('/')[:2]
if total.isdecimal():
self._set_field(
f'{fieldname}_total', int(total))
if value.isdecimal():
self._set_field(fieldname, int(value))
elif value:
self._set_field(fieldname, value)
def _parse_pages(self, fh: BinaryIO) -> Iterator[bytearray]:
# for the spec, see: https://wiki.xiph.org/Ogg
packet_data = bytearray()
current_serial = None
last_granule_pos = 0
last_audio_size = 0
header_len = 27
page_header = fh.read(header_len) # read ogg page header
while len(page_header) == header_len:
version = page_header[4]
if page_header[:4] != b'OggS' or version != 0:
raise ParseError('Invalid OGG header')
# https://xiph.org/ogg/doc/framing.html
header_type = page_header[5]
eos = header_type & 0x04
granule_pos, serial = unpack('<qI', page_header[6:18])
if current_serial is None:
current_serial = serial
serial_match = serial == current_serial
if serial_match and granule_pos > 0:
if eos:
self._granule_pos = granule_pos
else:
self._granule_pos = last_granule_pos
last_granule_pos = granule_pos
segments = page_header[26]
seg_sizes = unpack('B' * segments, fh.read(segments))
read_size = 0
audio_size = 0
for seg_size in seg_sizes: # read all segments
read_size += seg_size
if self._audio_size is not None:
audio_size += seg_size
# less than 255 bytes means end of packet
if seg_size < 255 and serial_match and not self._tags_parsed:
packet_data += fh.read(read_size)
yield packet_data
packet_data.clear()
read_size = 0
if read_size:
if not serial_match or self._tags_parsed:
fh.seek(read_size, SEEK_CUR)
else: # packet continues on next page
packet_data += fh.read(read_size)
if serial_match and self._audio_size is not None:
if eos:
self._audio_size += last_audio_size + audio_size
else:
self._audio_size += last_audio_size
last_audio_size = audio_size
if eos:
break
page_header = fh.read(header_len)
class _Wave(TinyTag):
"""WAVE Parser.
https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
"""
_RIFF_MAPPING = {
b'INAM': 'title',
b'TITL': 'title',
b'IPRD': 'album',
b'IART': 'artist',
b'IBPM': 'other.bpm',
b'ICMT': 'comment',
b'IMUS': 'composer',
b'ICOP': 'other.copyright',
b'ICRD': 'year',
b'IGNR': 'genre',
b'ILNG': 'other.language',
b'ISRC': 'other.isrc',
b'IPUB': 'other.publisher',
b'IPRT': 'track',
b'ITRK': 'track',
b'TRCK': 'track',
b'IBSU': 'other.url',
b'YEAR': 'year',
b'IWRI': 'other.lyricist',
b'IENC': 'other.encoded_by',
b'IMED': 'other.media',
}
def _determine_duration(self, fh: BinaryIO) -> None:
if not self._tags_parsed:
self._parse_tag(fh)
def _parse_tag(self, fh: BinaryIO) -> None:
# http://www-mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
# https://en.wikipedia.org/wiki/WAV
header = fh.read(12)
if header[:4] != b'RIFF' or header[8:12] != b'WAVE':
raise ParseError('Invalid WAV header')
if self._parse_duration:
self.bitdepth = 16 # assume 16bit depth (CD quality)
header_len = 8
chunk_header = fh.read(header_len)
while len(chunk_header) == header_len:
subchunk_id = chunk_header[:4]
subchunk_size = unpack('I', chunk_header[4:])[0]
# IFF chunks are padded to an even number of bytes
subchunk_size += subchunk_size % 2
if self._parse_duration and subchunk_id == b'fmt ':
chunk = fh.read(subchunk_size)
_format_tag, channels, samplerate = unpack('<HHI', chunk[:8])
bitdepth = unpack('<H', chunk[14:16])[0]
if bitdepth == 0:
# Certain codecs (e.g. GSM 6.10) give us a bit depth of
# zero. Avoid division by zero when calculating duration.
bitdepth = 1
self.bitrate = samplerate * channels * bitdepth / 1000
self.channels, self.samplerate, self.bitdepth = (
channels, samplerate, bitdepth)
elif self._parse_duration and subchunk_id == b'data':
if (self.channels is not None and self.samplerate is not None
and self.bitdepth is not None):
self.duration = (
subchunk_size / self.channels / self.samplerate
/ (self.bitdepth / 8))
fh.seek(subchunk_size, SEEK_CUR)
elif self._parse_tags and subchunk_id == b'LIST':
chunk = fh.read(subchunk_size)
if chunk.startswith(b'INFO'):
walker = BytesIO(chunk)
walker.seek(4) # skip header
field = walker.read(4)
while len(field) == 4:
data_length = unpack('I', walker.read(4))[0]
# IFF chunks are padded to an even size
data_length += data_length % 2
# strip zero-byte
data = walker.read(data_length).split(b'\x00', 1)[0]
if field in self._RIFF_MAPPING:
fieldname = self._RIFF_MAPPING[field]
value = data.decode('utf-8', 'replace')
if fieldname == 'track':
if value.isdecimal():
self._set_field(fieldname, int(value))
else:
self._set_field(fieldname, value)
field = walker.read(4)
elif self._parse_tags and subchunk_id in {b'id3 ', b'ID3 '}:
# pylint: disable=protected-access
id3 = _ID3()
id3._filehandler = fh
id3._load(tags=True, duration=False, image=self._load_image)
self._update(id3)
else: # some other chunk, just skip the data
fh.seek(subchunk_size, SEEK_CUR)
chunk_header = fh.read(header_len)
self._tags_parsed = True
class _Flac(TinyTag):
"""FLAC Parser."""
_STREAMINFO = 0
_VORBIS_COMMENT = 4
_PICTURE = 6
def _determine_duration(self, fh: BinaryIO) -> None:
if not self._tags_parsed:
self._parse_tag(fh)
def _parse_tag(self, fh: BinaryIO) -> None:
id3 = None
header = fh.read(4)
if header.startswith(b'ID3'): # parse ID3 header if it exists
fh.seek(-4, SEEK_CUR)
# pylint: disable=protected-access
id3 = _ID3()
id3._parse_tags = self._parse_tags
id3._load_image = self._load_image
id3._parse_id3v2(fh)
header = fh.read(4) # after ID3 should be fLaC
if header[:4] != b'fLaC':
raise ParseError('Invalid FLAC header')
# for spec, see https://xiph.org/flac/ogg_mapping.html
header_len = 4
block_header = fh.read(header_len)
while len(block_header) == header_len:
block_type = block_header[0] & 0x7f
is_last_block = block_header[0] & 0x80
size = unpack('>I', b'\x00' + block_header[1:])[0]
# http://xiph.org/flac/format.html#metadata_block_streaminfo
if self._parse_duration and block_type == self._STREAMINFO:
head = fh.read(size)
if len(head) < 34: # invalid streaminfo
break
# From the xiph documentation:
# py | <bits>
# ----------------------------------------------
# H | <16> The minimum block size (in samples)
# H | <16> The maximum block size (in samples)
# 3s | <24> The minimum frame size (in bytes)
# 3s | <24> The maximum frame size (in bytes)
# 8B | <20> Sample rate in Hz.
# | <3> (number of channels)-1.
# | <5> (bits per sample)-1.
# | <36> Total samples in stream.
# 16s| <128> MD5 signature
# channels--. bits total samples
# |----- samplerate -----| |-||----| |---------~ ~----|
# 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000
# #---4---# #---5---# #---6---# #---7---# #--8-~ ~-12-#
sr = unpack('>I', b'\x00' + head[10:13])[0] >> 4
self.channels = ((head[12] >> 1) & 0x07) + 1
self.bitdepth = (
((head[12] & 1) << 4) + ((head[13] & 0xF0) >> 4) + 1)
tot_samples_b = bytes([head[13] & 0x0F]) + head[14:18]
tot_samples = unpack('>Q', b'\x00\x00\x00' + tot_samples_b)[0]
self.duration = duration = tot_samples / sr
self.samplerate = sr
if duration > 0:
self.bitrate = self.filesize * 8 / duration / 1000
elif self._parse_tags and block_type == self._VORBIS_COMMENT:
# pylint: disable=protected-access
walker = BytesIO(fh.read(size))
oggtag = _Ogg()
oggtag._parse_vorbis_comment(walker)
self._update(oggtag)
elif self._load_image and block_type == self._PICTURE:
fieldname, value = self._parse_image(fh)
# pylint: disable=protected-access
self.images._set_field(fieldname, value)
else:
fh.seek(size, SEEK_CUR) # seek over this block
if is_last_block:
break
block_header = fh.read(header_len)
if id3 is not None: # apply ID3 tags after vorbis
self._update(id3)
self._tags_parsed = True
@classmethod
def _parse_image(cls, fh: BinaryIO) -> tuple[str, Image]:
# https://xiph.org/flac/format.html#metadata_block_picture
pic_type, mime_type_len = unpack('>II', fh.read(8))
mime_type = fh.read(mime_type_len).decode('utf-8', 'replace')
description_len = unpack('>I', fh.read(4))[0]
description = fh.read(description_len).decode('utf-8', 'replace')
fh.seek(16, SEEK_CUR) # jump over width, height, depth, colors
pic_len = unpack('>I', fh.read(4))[0]
# pylint: disable=protected-access
return _ID3._create_tag_image(
fh.read(pic_len), pic_type, mime_type, description)
class _Wma(TinyTag):
"""WMA Parser.
http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx
http://uguisu.skr.jp/Windows/format_asf.html
"""
_ASF_MAPPING = {
'WM/ARTISTS': 'artist',
'WM/TrackNumber': 'track',
'WM/PartOfSet': 'disc',
'WM/Year': 'year',
'WM/AlbumArtist': 'albumartist',
'WM/Genre': 'genre',
'WM/AlbumTitle': 'album',
'WM/Composer': 'composer',
'WM/Publisher': 'other.publisher',
'WM/BeatsPerMinute': 'other.bpm',
'WM/InitialKey': 'other.initial_key',
'WM/Lyrics': 'other.lyrics',
'WM/Language': 'other.language',
'WM/Director': 'other.director',
'WM/AuthorURL': 'other.url',
'WM/ISRC': 'other.isrc',
'WM/Conductor': 'other.conductor',
'WM/Writer': 'other.lyricist',
'WM/SetSubTitle': 'other.set_subtitle',
'WM/EncodedBy': 'other.encoded_by',
'WM/EncodingSettings': 'other.encoder_settings',
'WM/Media': 'other.media',
'WM/Barcode': 'other.barcode',
'WM/CatalogNo': 'other.catalog_number',
'WM/ContentGroupDescription': 'other.grouping',
'WM/Work': 'other.work'
}
_UNPACK_FORMATS = {
1: '<B',
2: '<H',
4: '<I',
8: '<Q'
}
_ASF_CONTENT_DESC = b'3&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'
_ASF_EXT_CONTENT_DESC = (b'@\xa4\xd0\xd2\x07\xe3\xd2\x11\x97\xf0\x00'
b'\xa0\xc9^\xa8P')
_STREAM_BITRATE_PROPS = (b'\xceu\xf8{\x8dF\xd1\x11\x8d\x82\x00`\x97\xc9'
b'\xa2\xb2')
_ASF_FILE_PROP = b'\xa1\xdc\xab\x8cG\xa9\xcf\x11\x8e\xe4\x00\xc0\x0c Se'
_ASF_STREAM_PROPS = (b'\x91\x07\xdc\xb7\xb7\xa9\xcf\x11\x8e\xe6\x00\xc0'
b'\x0c Se')
_STREAM_TYPE_ASF_AUDIO_MEDIA = b'@\x9ei\xf8M[\xcf\x11\xa8\xfd\x00\x80_\\D+'
def _determine_duration(self, fh: BinaryIO) -> None:
if not self._tags_parsed:
self._parse_tag(fh)
def _parse_tag(self, fh: BinaryIO) -> None:
# http://www.garykessler.net/library/file_sigs.html
# http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc521913958
header = fh.read(30)
if (header[:16] != b'0&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'
or header[-1:] != b'\x02'):
raise ParseError('Invalid WMA header')
header_len = 24
object_header = fh.read(header_len)
while len(object_header) == header_len:
object_size = unpack('<Q', object_header[16:])[0]
if object_size == 0 or object_size > self.filesize:
break # invalid object, stop parsing.
object_id = object_header[:16]
if self._parse_tags and object_id == self._ASF_CONTENT_DESC:
walker = BytesIO(fh.read(object_size - header_len))
(title_length, author_length,
copyright_length, description_length,
rating_length) = unpack('<5H', walker.read(10))
data_blocks = {
'title': title_length,
'artist': author_length,
'other.copyright': copyright_length,
'comment': description_length,
'_rating': rating_length,
}
for i_field_name, length in data_blocks.items():
value = self._unpad(
walker.read(length).decode('utf-16', 'replace'))
if not i_field_name.startswith('_') and value:
self._set_field(i_field_name, value)
elif self._parse_tags and object_id == self._ASF_EXT_CONTENT_DESC:
# http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc509555195
walker = BytesIO(fh.read(object_size - header_len))
descriptor_count = unpack('<H', walker.read(2))[0]
for _ in range(descriptor_count):
name_len = unpack('<H', walker.read(2))[0]
name = self._unpad(
walker.read(name_len).decode('utf-16', 'replace'))
value_type, value_len = unpack('<HH', walker.read(4))
# Unicode string
if value_type == 0:
value = self._unpad(
walker.read(value_len).decode('utf-16', 'replace'))
# DWORD / QWORD / WORD
elif (1 < value_type < 6
and value_len in self._UNPACK_FORMATS):
fmt = self._UNPACK_FORMATS[value_len]
value = str(unpack(fmt, walker.read(value_len))[0])
else:
walker.seek(value_len, SEEK_CUR) # skip other values
continue
# try to get normalized field name
if name in self._ASF_MAPPING:
field_name = self._ASF_MAPPING[name]
else: # custom field
if name.startswith('WM/'):
name = name[3:]
field_name = self._OTHER_PREFIX + name.lower()
if field_name in {'track', 'disc'}:
if isinstance(value, int) or value.isdecimal():
self._set_field(field_name, int(value))
elif value:
self._set_field(field_name, value)
elif self._parse_duration and object_id == self._ASF_FILE_PROP:
data = fh.read(object_size - header_len)
play_duration = unpack('<Q', data[40:48])[0] / 10000000
preroll = unpack('<Q', data[56:64])[0] / 1000
# subtract the preroll to get the actual duration
self.duration = max(play_duration - preroll, 0.0)
elif self._parse_duration and object_id == self._ASF_STREAM_PROPS:
data = fh.read(object_size - header_len)
stream_type = data[:16]
if stream_type == self._STREAM_TYPE_ASF_AUDIO_MEDIA:
(codec_id_format_tag, self.channels, self.samplerate,
avg_bytes_per_second) = unpack('<HHII', data[54:66])
self.bitrate = avg_bytes_per_second * 8 / 1000
if codec_id_format_tag == 355: # lossless
self.bitdepth = unpack('<H', data[68:70])[0]
else:
# skip unknown object ids
fh.seek(object_size - header_len, SEEK_CUR)
object_header = fh.read(header_len)
self._tags_parsed = True
class _Aiff(TinyTag):
"""AIFF Parser.
https://en.wikipedia.org/wiki/Audio_Interchange_File_Format#Data_format
https://web.archive.org/web/20171118222232/http://www-mmsp.ece.mcgill.ca/documents/audioformats/aiff/aiff.html
https://web.archive.org/web/20071219035740/http://www.cnpbagwell.com/aiff-c.txt
A few things about the spec:
* IFF strings are not supposed to be null terminated, but sometimes
are.
* Some tools might throw more metadata into the ANNO chunk, but it is
wildly unreliable to count on it. In fact, the official spec
recommends against using it. That said... this code throws the
ANNO field into comment and hopes for the best.
The key thing here is that AIFF metadata is usually in a handful of
fields and the rest is an ID3 or XMP field. XMP is too complicated
and only Adobe-related products support it. The vast majority use
ID3.
"""
_AIFF_MAPPING = {
b'NAME': 'title',
b'AUTH': 'artist',
b'ANNO': 'comment',
b'(c) ': 'other.copyright',
}
def _parse_tag(self, fh: BinaryIO) -> None:
header = fh.read(12)
if header[:4] != b'FORM' or header[8:12] not in {b'AIFC', b'AIFF'}:
raise ParseError('Invalid AIFF header')
header_len = 8
chunk_header = fh.read(header_len)
while len(chunk_header) == header_len:
subchunk_id = chunk_header[:4]
subchunk_size = unpack('>I', chunk_header[4:])[0]
# IFF chunks are padded to an even number of bytes
subchunk_size += subchunk_size % 2
if self._parse_tags and subchunk_id in self._AIFF_MAPPING:
value = self._unpad(
fh.read(subchunk_size).decode('utf-8', 'replace'))
self._set_field(self._AIFF_MAPPING[subchunk_id], value)
elif self._parse_duration and subchunk_id == b'COMM':
chunk = fh.read(subchunk_size)
channels, num_frames, bitdepth = unpack('>hLh', chunk[:8])
self.channels, self.bitdepth = channels, bitdepth
try:
# Extended precision
exp, mantissa = unpack('>HQ', chunk[8:18])
sr = int(mantissa * (2 ** (exp - 0x3FFF - 63)))
duration = num_frames / sr
bitrate = sr * channels * bitdepth / 1000
self.samplerate, self.duration, self.bitrate = (
sr, duration, bitrate)
except OverflowError:
pass
elif self._parse_tags and subchunk_id in {b'id3 ', b'ID3 '}:
# pylint: disable=protected-access
id3 = _ID3()
id3._filehandler = fh
id3._load(tags=True, duration=False, image=self._load_image)
self._update(id3)
else: # some other chunk, just skip the data
fh.seek(subchunk_size, SEEK_CUR)
chunk_header = fh.read(header_len)
self._tags_parsed = True
def _determine_duration(self, fh: BinaryIO) -> None:
if not self._tags_parsed:
self._parse_tag(fh)