#!/usr/bin/env python # # iTunesDB generator library for rePear, the iPod database management tool # Copyright (C) 2006-2008 Martin J. Fiedler # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import struct, random, array, sys, os, stat, time from dataclasses import dataclass from enum import StrEnum from functools import cmp_to_key try: from PIL import Image PILAvailable = True except ImportError: PILAvailable = False def DefaultLoggingFunction(text, force_flush=True): sys.stdout.write(text) if force_flush: sys.stdout.flush() log = DefaultLoggingFunction class Field: def __bytes__(self): raise Exception("abstract function call") def __len__(self): raise Exception("abstract function call") class Tags(StrEnum): BD = "mhbd" # Database object, root: everything is a child of this SD = "mhsd" # Data set, stores data of a type LT = "mhlt" # Track list, stores a list of tracks (wow) IT = "mhit" # Track LP = "mhlp" # Stores the playlists YP = "mhyp" # Playlist IP = "mhip" # Playlist item OD = "mhod" # Stores literal data, see http://www.ipodlinux.org/ITunesDB/#Data_Object LA = "mhla" # Album list LF = "mhlf" # File list IF = "mhif" # Image file FD = "mhfd" # Database object of artwork db LI = "mhli" # Image list II = "mhii" # Image object ("Image Item") NI = "mhni" # Image object ("Image Name") class F_Tag(Field): def __init__(self, tag: Tags): self.tag = tag.value.encode() assert isinstance(self.tag, bytes) def __bytes__(self): return self.tag def __len__(self): return len(self.tag) class F_Bytes(Field): def __init__(self, data: bytes, padding: int = 0): self.data = bytes(data) while len(self) < padding: self.data += b"\0" def __bytes__(self): return self.data def __len__(self): return len(self.data) class F_Formatable(Field): def __init__(self, format: str | bytes, value: int): self.format = format self.value = int(value) def __bytes__(self): return struct.pack(f"<{self.format}", self.value) def __len__(self): return struct.calcsize(self.format) class F_Int64(F_Formatable): def __init__(self, value: int): F_Formatable.__init__(self, "Q", value) class F_Int32(F_Formatable): def __init__(self, value: int): F_Formatable.__init__(self, "L", value) class F_Int16(F_Formatable): def __init__(self, value: int): F_Formatable.__init__(self, "H", value) class F_Int8(F_Formatable): def __init__(self, value: int): F_Formatable.__init__(self, "B", value) class F_HeaderLength(F_Int32): def __init__(self): F_Int32.__init__(self, 0) class F_TotalLength(F_Int32): def __init__(self): F_Int32.__init__(self, 0) class F_ChildCount(F_Int32): def __init__(self): F_Int32.__init__(self, 0) class F_Padding(Field): def __init__(self, length): self.length = length def __bytes__(self): return self.length * b"\0" def __len__(self): return self.length class Record: def __init__(self, header): self.header_length_at = None self.total_length_at = None self.child_count_at = None data = b"" for field in header: if isinstance(field, F_HeaderLength): self.header_length_at = len(data) elif isinstance(field, F_TotalLength): self.total_length_at = len(data) elif isinstance(field, F_ChildCount): self.child_count_at = len(data) d = field if isinstance(d, str): d = d.encode() data += bytes(d) if self.header_length_at: data = data[:self.header_length_at] + struct.pack(" val_b: return 1 # type: ignore return 0 def ifelse(condition, then_val, else_val=None): if condition: return then_val else: return else_val MAC_TIME_OFFSET = 2082844800 if time.daylight: tzoffset = time.altzone else: tzoffset = time.timezone def unixtime2mactime(t): if not t: return t return t + MAC_TIME_OFFSET - tzoffset def mactime2unixtime(t): if not t: return t return t - MAC_TIME_OFFSET + tzoffset # "fuzzy" mtime comparison, allows for two types of slight deviations: # 1. differences of exact multiples of one hour (usually time zome problems) # 2. differences of less than 2 seconds (FAT timestamps are imprecise) def compare_mtime(a, b): diff = abs(a - b) if diff > 86402: return False return ((diff % 3600) in (0, 1, 2, 3598, 3599)) class StringDataObject(Record): def __init__(self, mhod_type, content): if isinstance(content, bytes): encoded = content else: encoded = content.encode('utf_16_le', 'replace') super().__init__(( F_Tag(Tags.OD), F_Int32(0x18), # Header size F_TotalLength(), F_Int32(mhod_type), F_Padding(8), # This contains unknown data F_Int32(1), # Position F_Int32(len(encoded)), F_Int32(1), # Not sure what this is, thought before this was a UTF-8/UTF-16 toggle F_Padding(4) # Unknown )) self.add(encoded) class OrderDataObject(Record): def __init__(self, order): super().__init__(( F_Tag(Tags.OD), F_Int32(0x18), # Header size F_Int32(0x2C), # Total lenght, static? F_Int32(100), # Type 100, ipodlinux says "Seems to vary. iTunes uses it for column sizing info as well as an order indicator in playlists." F_Padding(8), # Unknown F_Int32(order), # Position F_Padding(16) # All rest is null )) class TrackItemRecord(Record): def __init__(self, info): if not 'id' in info: raise KeyError("no track ID set") format = info.get('format', "mp3-cbr") if info.get('artwork', None): default_has_artwork = True default_artwork_size = 1 else: default_has_artwork = False default_artwork_size = 0 if 'video format' in info: media_type = 2 else: media_type = 1 super().__init__(( F_Tag(Tags.IT), # This describes a track F_HeaderLength(), F_TotalLength(), F_ChildCount(), # Described as number of strings F_Int32(info.get('id', 0)), # ID for the track, for tracking in the playlists F_Int32(info.get('visible', 1)), # visible, hides the track if 0 F_Bytes({"mp3": " 3PM", "aac": " CAA", "mp4a": "A4PM"}.get(format[:3], "\0\0\0\0").encode()), # File type, mp3, aac or m4a F_Int16({"mp3-cbr": 0x100, "mp3-vbr": 0x101, "aac": 0, "mp4a": 0}.get(format, 0)), # type1 as in the wiki (and type2?) F_Int8(info.get('compilation', 0)), # is this from a compilation F_Int8(info.get('rating', 0)), # rating times 20, not updated by ipod F_Int32(unixtime2mactime(info.get('mtime', 0))), # mod time F_Int32(info.get('size', 0)), # size of track in bytes F_Int32(int(info.get('length', 0) * 1000)), # length of track in milliseconds F_Int32(info.get('track number', 0)), # track number F_Int32(info.get('total tracks', 0)), # album track count F_Int32(info.get('year', 0)), # track year F_Int32(info.get('bitrate', 0)), # bitrate of track F_Int16(0), # sample rate times 0x10000 F_Int16(info.get('sample rate', 0)), # sample rate times 0x10000 (not sure why they split this into two bytes in this impl) F_Int32(info.get('volume', 0)), # -255 to 255 what volume you want at playback F_Int32(info.get('start time', 0)), F_Int32(info.get('stop time', 0)), F_Int32(info.get('soundcheck', 0)), # used to normalize tracks F_Int32(info.get('play count', 0)), F_Int32(0), F_Int32(unixtime2mactime(info.get('last played time', 0))), F_Int32(info.get('disc number', 0)), F_Int32(info.get('total discs', 0)), F_Int32(info.get('user id', 0)), F_Int32(info.get('date added', 0)), F_Int32(int(info.get('bookmark time', 0) * 1000)), F_Int64(info.get('dbid', 0)), F_Int8(info.get('checked', 0)), F_Int8(info.get('application rating', 0)), F_Int16(info.get('BPM', 0)), F_Int16(info.get('artwork count', 1)), F_Int16({"wave": 0, "audible": 1}.get(format, 0xFFFF)), F_Int32(info.get('artwork size', default_artwork_size)), F_Int32(0), F_Formatable("f", info.get('sample rate', 0)), F_Int32(info.get('release date', 0)), F_Int16({"aac": 0x0033, "mp4a": 0x0033, "audible": 0x0029, "wave:": 0}.get(format, 0x0C)), F_Int16(info.get('explicit flag', 0)), F_Padding(8), F_Int32(info.get('skip count', 0)), F_Int32(unixtime2mactime(info.get('last skipped time', 0))), F_Int8(2 - int(info.get('has artwork', default_has_artwork))), F_Int8(not info.get('shuffle flag', 1)), F_Int8(info.get('bookmark flag', 0)), F_Int8(info.get('podcast flag', 0)), F_Int64(info.get('dbid', 0)), F_Int8(info.get('lyrics flag', 0)), F_Int8(info.get('movie flag', 0)), F_Int8(info.get('played mark', 1)), F_Padding(9), F_Int32(ifelse(format[:3]=="mp3", 0, info.get('sample count', 0))), F_Padding(16), F_Int32(media_type), F_Int32(0), # season number F_Int32(0), # episode number F_Padding(28), F_Int32(info.get('gapless data', 0)), F_Int32(0), F_Int16(info.get('gapless track flag', 0)), F_Int16(info.get('gapless album flag', 0)), F_Padding(20), # hash F_Padding(18), # misc unknowns F_Int16(info.get('album id', 0)), F_Padding(52), # padding before mhii link F_Int32(info.get('mhii link', 0)) )) for mhod_type, key in ((1,'title'), (4,'artist'), (3,'album'), (5,'genre'), (6,'filetype'), (2,'path')): if key in info: value = info[key] if key == "path": value = ":" + value.replace("/", ":").replace("\\", ":") self.add(StringDataObject(mhod_type, value)) class PlaylistItemRecord(Record): def __init__(self, order, trackid, timestamp=0): super().__init__(( F_Tag(Tags.IP), # Playlist item F_HeaderLength(), F_TotalLength(), F_ChildCount(), F_Int32(0), F_Int32((trackid + 0x1337) & 0xFFFF), # 1337 is not in the specs... F_Int32(trackid), F_Int32(timestamp), F_Int32(0), F_Padding(40) )) self.add(OrderDataObject(order)) class PlaylistRecord(Record): def __init__(self, name, track_count, order=0, master=0, timestamp=0, plid=None, sort_order=1): if not plid: plid = random.randrange(0, 18446744073709551615) super().__init__(( F_Tag(Tags.YP), F_HeaderLength(), F_TotalLength(), F_ChildCount(), F_Int32(track_count), F_Int32(master), F_Int32(timestamp), F_Int64(plid), F_Int32(0), F_Int16(1), F_Int16(0), F_Int32(sort_order), F_Padding(60) )) self.add(StringDataObject(1, name)) self.add(OrderDataObject(order)) def add_index(self, tracklist, index_type, fields): order = list(range(len(tracklist))) order.sort(key=cmp_to_key(lambda a, b: compare_dict(tracklist[a], tracklist[b], fields))) mhod = Record(( F_Tag(Tags.OD), F_Int32(24), F_TotalLength(), F_Int32(52), F_Padding(8), F_Int32(index_type), F_Int32(len(order)), F_Padding(40) )) arr = array.array('L', order) # the array module doesn't directly support endianness, so we detect # the machine's endianness and swap if it is big-endian if array.array('L', [1]).tobytes()[3] == 1: arr.byteswap() mhod.add(arr) self.add(mhod) def set_playlist(self, track_ids): [self.add(PlaylistItemRecord(i+1, track_ids[i]), 0) for i in range(len(track_ids))] class iTunesDB: def __init__(self, tracklist, name="Unnamed", dbid=None): if not dbid: dbid = random.randrange(0, 0xffffffffffffffff) # random 64 bit integer self.mhbd = Record(( F_Tag(Tags.BD), F_HeaderLength(), F_TotalLength(), F_Int32(0), F_Int32(0x19), F_ChildCount(), F_Int64(dbid), F_Int16(2), F_Padding(14), F_Int16(0), # hash indicator (set later by hash58) F_Padding(20), # first hash F_Bytes(b"en"), # language = 'en' F_Bytes(b"\0rePear!"), # library persistent ID F_Padding(20), # hash58 F_Padding(80) )) self.mhsd = Record(( F_Tag(Tags.SD), F_HeaderLength(), F_TotalLength(), F_Int32(1), F_Padding(80) )) self.mhlt = Record(( F_Tag(Tags.LT), F_HeaderLength(), F_ChildCount(), F_Padding(80) )) for track in tracklist: self.mhlt.add(TrackItemRecord(track)) self.mhsd.add(self.mhlt) del self.mhlt self.mhbd.add(self.mhsd) self.mhsd = Record(( F_Tag(Tags.SD), F_HeaderLength(), F_TotalLength(), F_Int32(2), F_Padding(80) )) self.mhlp = Record(( F_Tag(Tags.LP), F_HeaderLength(), F_ChildCount(), F_Padding(80) )) mhyp = PlaylistRecord(name, len(tracklist), master=1, sort_order=10) mhyp.add_index(tracklist, 0x03, ('title',)) mhyp.add_index(tracklist, 0x04, ('album','disc number','track number','title')) mhyp.add_index(tracklist, 0x05, ('artist','album','disc number','track number','title')) mhyp.add_index(tracklist, 0x07, ('genre','artist','album','disc number','track number','title')) mhyp.add_index(tracklist, 0x12, ('composer','title')) mhyp.set_playlist([track['id'] for track in tracklist]) self.mhlp.add(mhyp) def add_playlist(self, tracks, name="Unnamed"): mhyp = PlaylistRecord(name, len(tracks), sort_order=1) mhyp.set_playlist([track['id'] for track in tracks]) self.mhlp.add(mhyp) def finish(self): self.mhsd.add(self.mhlp) del self.mhlp self.mhbd.add(self.mhsd) del self.mhsd result = bytes(self.mhbd) del self.mhbd return result ################################################################################ ## ArtworkDB / PhotoDB record classes ## ################################################################################ class RGB565_LE: bpp = 16 @staticmethod def convert(data): res = array.array('B', [0 for x in range(len(data)//3*2)]) io = 0 for ii in range(0, len(data), 3): g = ord(data[ii+1]) >> 2 res[io] = ((g & 7) << 5) | (ord(data[ii+2]) >> 3) res[io|1] = (ord(data[ii]) & 0xF8) | (g >> 3) io += 2 return str(res) ImageFormats = { 'nano': ((1027, 100, 100, RGB565_LE), (1031, 42, 42, RGB565_LE)), 'photo': ((1016, 140, 140, RGB565_LE), (1017, 56, 56, RGB565_LE)), 'video': ((1028, 100, 100, RGB565_LE), (1029, 200, 200, RGB565_LE)), 'nano3g': ((1055, 128, 128, RGB565_LE), (1060, 320, 320, RGB565_LE), (1061, 55, 56, RGB565_LE)), 'nano4g': ((1055, 128, 128, RGB565_LE), (1078, 80, 80, RGB565_LE), (1071, 240, 240, RGB565_LE), (1074, 50, 50, RGB565_LE)), '4g': 'photo', '5g': 'video', '6g': 'nano3g', 'classic': 'nano3g', 'nano1g': 'nano', 'nano2g': 'nano', } @dataclass class ImageInfo: format: object = None index: int = 0 sx: int = 0 sy: int = 0 mx: int = 0 my: int = 0 class ArtworkFormat: def __init__(self, descriptor, cache_info=(0,0)): self.fid, self.height, self.width, self.format = descriptor self.filename = "F%04d_1.ithmb" % self.fid self.size = self.width * self.height * self.format.bpp/8 self.fullname = "iPod_Control/Artwork/" + self.filename # check if the cache file can be used try: s = os.stat(self.fullname) use_cache = stat.S_ISREG(s[stat.ST_MODE]) and compare_mtime(cache_info[0], s[stat.ST_MTIME]) and (s[stat.ST_SIZE] == cache_info[1]) except OSError: use_cache = False # load the cache if use_cache: try: with open(self.fullname, "rb") as f: self.cache = f.read() except IOError: use_cache = False if not use_cache: self.cache = None try: self.f = open(self.fullname, "wb") except IOError: log("WARNING: Error opening the artwork data file `%s'\n", self.filename) self.f = None def close(self): if self.f: self.f.close() try: s = os.stat(self.fullname) cache_info = (s[stat.ST_MTIME], s[stat.ST_SIZE]) except OSError: cache_info = (0, 0) return (self.fid, cache_info) def GenerateImage(self, image, index, cache_entry=None): if cache_entry and self.cache: offset = self.size * cache_entry['index'] data = self.cache[offset : offset+self.size] sx = cache_entry['dim'][self.fid]['sx'] sy = cache_entry['dim'][self.fid]['sy'] mx = cache_entry['dim'][self.fid]['mx'] my = cache_entry['dim'][self.fid]['my'] else: log(" [%dx%d]" % (self.width, self.height), True) # sx/sy = resulting image size sx = self.width sy = image.size[1] * sx / image.size[0] if sy > self.height: sy = self.height sx = image.size[0] * sy / image.size[1] # mx/my = margin size mx = self.width - sx my = self.height - sy # process the image temp = image.resize((sx, sy), Image.Resampling.LANCZOS) thumb = Image.new('RGB', (self.width, self.height), (255, 255, 255)) thumb.paste(temp, (mx/2, my/2)) del temp data = self.format.convert(thumb.tobytes()) del thumb # save the image try: assert self.f self.f.seek(self.size * index) self.f.write(data) except IOError: log(" [WRITE ERROR]", True) iinfo = ImageInfo() iinfo.format = self iinfo.index = index iinfo.sx = sx iinfo.sy = sy iinfo.mx = mx iinfo.my = my return iinfo class ArtworkDBStringDataObject(Record): def __init__(self, mhod_type, content): if isinstance(content, bytes): content = content.decode(sys.getfilesystemencoding(), 'replace') elif not isinstance(content, str): content = str(content) content = content.encode('utf_16_le', 'replace') padding = len(content) % 4 if padding: padding = 4 - padding super().__init__(( F_Tag(Tags.OD), F_Int32(0x18), F_TotalLength(), F_Int16(mhod_type), F_Int16(padding), F_Padding(8), F_Int32(len(content)), F_Int32(2), F_Int32(0) )) self.add(content) if padding: self.add("\0" * padding) class ImageDataObject(Record): def __init__(self, iinfo): super().__init__( ( F_Tag(Tags.OD), F_Int32(0x18), F_TotalLength(), F_Int32(2), F_Padding(8) )) mhni = Record(( F_Tag(Tags.NI), F_Int32(0x4C), F_TotalLength(), F_ChildCount(), F_Int32(iinfo.format.fid), F_Int32(iinfo.format.size * iinfo.index), F_Int32(iinfo.format.size), F_Int16(iinfo.my), F_Int16(iinfo.mx), F_Int16(iinfo.sy), F_Int16(iinfo.sx), F_Padding(4), F_Int32(iinfo.format.size), F_Padding(32) )) mhni.add(ArtworkDBStringDataObject(3, ":" + iinfo.format.filename)) self.add(mhni) class ImageItemRecord(Record): def __init__(self, img_id, dbid, iinfo_list, orig_size=0): super().__init__( ( F_Tag(Tags.II), F_Int32(0x98), F_TotalLength(), F_ChildCount(), F_Int32(img_id), F_Int64(dbid), F_Padding(20), F_Int32(orig_size), F_Padding(100) )) for iinfo in iinfo_list: self.add(ImageDataObject(iinfo)) def ArtworkDB(model, imagelist, base_id=0x40, cache_data=({}, {})): while isinstance(ImageFormats.get(model, None), str): model = ImageFormats[model] if not model in ImageFormats: raise Exception format_cache, image_cache = cache_data formats = [] for descriptor in ImageFormats[model]: formats.append(ArtworkFormat(descriptor, cache_info = format_cache.get(descriptor[0], (0,0)))) # if there's at least one format whose image file isn't cache-clean, # invalidate the cache if not formats[-1].cache: image_cache = {} # Image List mhsd = Record(( F_Tag(Tags.SD), F_HeaderLength(), F_TotalLength(), F_Int32(1), F_Padding(80) )) mhli = Record(( F_Tag(Tags.LI), F_HeaderLength(), F_ChildCount(), F_Padding(80) )) img_id = base_id index = 0 output_image_cache = {} image_count = 0 dbid2mhii = {} for source, dbid_list in imagelist.items(): log(source, False) # stat this image try: s = os.stat(source) except OSError as e: log(" [Error: %s]\n" % e.strerror, True) continue # check if the image is cacheworthy cache_entry = image_cache.get(source, None) if cache_entry: if (cache_entry['size'] != s[stat.ST_SIZE]) \ or not(compare_mtime(cache_entry['mtime'], s[stat.ST_MTIME])): cache_entry = None # if it's not cached, open the image if not cache_entry: try: image = Image.open(source) image.tobytes() except IOError as e: log(" [Error: %s]\n" % e, True) continue else: log(" [cached]", True) image = None # generate the image data and ArtworkDB records iinfo_list = [format.GenerateImage(image, index, cache_entry) for format in formats] for dbid in dbid_list: mhli.add(ImageItemRecord(img_id, dbid, iinfo_list, s[stat.ST_SIZE])) dbid2mhii[dbid] = img_id img_id += 1 del image # add the image into the new cache dim = {} for iinfo in iinfo_list: dim[iinfo.format.fid] = { 'sx': iinfo.sx, 'sy': iinfo.sy, 'mx': iinfo.mx, 'my': iinfo.my } output_image_cache[source] = { 'index': index, 'size': s[stat.ST_SIZE], 'mtime': s[stat.ST_MTIME], 'dim': dim } # done with this image del iinfo_list index += 1 image_count += len(dbid_list) log(" [OK]\n", True) # Date File Header mhfd = Record(( F_Tag(Tags.FD), F_HeaderLength(), F_TotalLength(), F_Int32(0), F_Int32(2), F_Int32(3), F_Int32(0), F_Int32(base_id + image_count), F_Padding(16), F_Int32(2), F_Padding(80) )) mhsd.add(mhli) mhfd.add(mhsd) # Album List (dummy) mhsd = Record(( F_Tag(Tags.SD), F_HeaderLength(), F_TotalLength(), F_Int32(2), F_Padding(80) )) mhsd.add(Record(( F_Tag(Tags.LA), F_HeaderLength(), F_Int32(0), F_Padding(80) ))) mhfd.add(mhsd) # File List mhsd = Record(( F_Tag(Tags.SD), F_HeaderLength(), F_TotalLength(), F_Int32(3), F_Padding(80) )) mhlf = Record(( F_Tag(Tags.LF), F_HeaderLength(), F_Int32(len(formats)), F_Padding(80) )) for format in formats: mhlf.add(Record(( F_Tag(Tags.IF), F_HeaderLength(), F_TotalLength(), F_Int32(0), F_Int32(format.fid), F_Int32(format.size), F_Padding(100) ))) # finalize ArtworkDB mhsd.add(mhlf) mhfd.add(mhsd) output_format_cache = dict([format.close() for format in formats]) del formats output_cache_data = (output_format_cache, output_image_cache) return (str(mhfd), output_cache_data, dbid2mhii) ################################################################################ ## a rudimentary ITDB reader (only reads titles, no playlists, and isn't very ## ## fault-tolerant) for the "dissect" action ## ################################################################################ mhod_type_map = { 1: 'title', 2: 'path', 3: 'album', 4: 'artist', 5: 'genre', 6: 'filetype', 8: 'comment', 12: 'composer' } class InvalidFormat(Exception): pass class DatabaseReader: def __init__(self, f="iPod_Control/iTunes/iTunesDB"): if isinstance(f, str): f = open(f, "rb") self.f = f self._skip_header(Tags.BD) while True: h = self._skip_header(Tags.SD) if len(h) < 16: raise InvalidFormat size, mhsd_type = struct.unpack(' yeah! if size < len(h): raise InvalidFormat self.f.seek(size - len(h), 1) self._skip_header(Tags.LT) def _skip_header(self, tag: Tags): hh = self.f.read(8) if (len(hh) != 8) or (hh[:4] != tag.value.encode()): raise InvalidFormat size = struct.unpack(' 40) and (data[:4] == b"mhod"): size, mhod_type = struct.unpack('