Files
repear/repear.py

1464 lines
55 KiB
Python

#!/usr/bin/env python
#
# rePear, the iPod database management tool
# Copyright (C) 2006-2008 Martin J. Fiedler <martin.fiedler@gmx.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
__title__ = "rePear"
__version__ = "0.4.1"
banner = "Welcome to %s, version %s" % (__title__, __version__)
DISSECT_BASE_DIR = "Dissected Tracks/"
DEFAULT_LAME_OPTS = "--quiet -h -V 5"
MASTER_PLAYLIST_FILE = "repear_playlists.ini"
SUPPORTED_FILE_FORMATS = (".mp3", ".ogg", ".m4a", ".m4b", ".mp4")
MUSIC_DIR = "iPod_Control/Music/"
CONTROL_DIR = "iPod_Control/iTunes/"
ARTWORK_DIR = "iPod_Control/Artwork/"
DB_FILE = CONTROL_DIR + "iTunesDB"
CACHE_FILE = CONTROL_DIR + "repear.cache"
MODEL_FILE = CONTROL_DIR + "repear.model"
FWID_FILE = CONTROL_DIR + "fwid"
ARTWORK_CACHE_FILE = ARTWORK_DIR + "repear.artwork_cache"
ARTWORK_DB_FILE = ARTWORK_DIR + "ArtworkDB"
import sys, optparse, os, fnmatch, stat, string, pickle, random
import re, traceback, tempfile
from pathlib import Path
import warnings
from functools import cmp_to_key
def cmp(a, b):
warnings.warn("cmp was used.")
return (a > b) - (a < b) # Python3 replacement for cmp()
import iTunesDB, mp3info, hash58
Options = {}
broken_log = False
homedir = ""
logfile = None
def open_log():
global logfile
Options['log'] = os.path.abspath(Options['log'])
try: logfile = open(Options['log'], "w")
except IOError: logfile = None
def log(line, flush=True):
global logfile, broken_log
sys.stdout.write(line)
if flush: sys.stdout.flush()
if logfile:
try:
logfile.write(line)
if flush: logfile.flush()
except IOError: broken_log = True
iTunesDB.log = log
def quit(code=1):
global logfile, broken_log
if logfile:
try: logfile.close()
except IOError: broken_log = True
logfile = None
log("\nLog written to `%s'\n" % Options['log'])
if broken_log: log("WARNING: there were errors while writing the log file\n")
sys.exit(code)
def fatal(line):
log("FATAL: %s\n" % line)
quit()
def confirm(prompt):
sys.stdout.write("%sDo you really want to continue? (y/N) " % prompt)
sys.stdout.flush()
try: answer = input()
except (IOError, EOFError, KeyboardInterrupt): answer = ""
if answer.strip().lower() in ("y", "yes"): return
log("Action aborted by user.\n")
quit()
def goto_root_dir():
global homedir
homedir = os.path.abspath(os.path.split(sys.argv[0])[0]).replace("\\", "/")
if homedir[-1] != '/': homedir += '/'
if Options['root']:
rootdir = Options['root'].replace("\\", "/")
if rootdir[-1] != '/': rootdir += '/'
else:
# no root directory specified -- try the current directory
rootdir = os.getcwd().replace("\\", "/")
if rootdir[-1] != '/': rootdir += '/'
if not os.path.isfile(rootdir + "iPod_Control/iTunes/iTunesDB"):
# not found? then try the executable's directory
rootdir = homedir
# special case on Windows: if the current directory doesn't contain
# a valid iPod directory structure, reduce the pathname to the first
# three characters, as in 'X:/', which is usually the root directory
if (os.name == 'nt') and not(os.path.isfile(rootdir + DB_FILE)):
rootdir = rootdir[:3]
if os.path.isfile(rootdir + DB_FILE): log("iPod root directory is `%s'\n" % rootdir)
else: fatal("root directory `%s' contains no iPod database" % rootdir)
try: os.chdir(rootdir)
except OSError as e: fatal("can't change to the iPod root directory: %s" % e.strerror)
def load_cache(return_on_error=(None,None)):
try: f = open(CACHE_FILE, "rb")
except IOError: return return_on_error
try:
content = pickle.load(f)
f.close()
except (IOError, EOFError, pickle.PickleError): return return_on_error
return content
def save_cache(content=None):
try:
with open(CACHE_FILE, "wb") as f: pickle.dump(content, f)
except (IOError, EOFError, pickle.PickleError): log("ERROR: can't save the rePear cache\n")
def execute(program, args):
global homedir
if os.name == "nt":
spawn = os.spawnv
path = homedir + program + ".exe"
args = ["\"%s\"" % arg for arg in args]
else:
spawn = os.spawnvp
path = program
try: return spawn(os.P_WAIT, path, [program] + args)
except OSError as e: log("ERROR: can't execute %s: %s\n" % (program, e.strerror))
except KeyboardInterrupt: return -2
def printable(x, kill_chars=""):
x = str(x)
for c in kill_chars: x = x.replace(c, "_")
return x
def move_file(src: str | Path, dest: str | Path):
src = Path(src)
dest = Path(dest)
# check if source file exists
if not src.is_file():
log(f"[FAILED]\nERROR: source file `{printable(src)}' doesn't exist\n", True)
return 'missing'
# don't clobber files (wouldn't work on Windows anyway)
if dest.is_file():
log(f"[FAILED]\nERROR: destination file `{printable(dest)}' already exists\n", True)
return 'exists'
# create parent directories if necessary
dest_dir = dest.parent
if not dest_dir.is_dir():
try: dest_dir.mkdir()
except OSError as e:
log(f"[FAILED]\nERROR: can't create destination directory `{printable(dest_dir)}': {e.strerror}\n", True)
return 'mkdir'
# finally rename it
try:
src.rename(dest)
except OSError as e:
log(f" [FAILED]\nERROR: can't move `{printable(src)}' to `{printable(dest)}': {e.strerror}\n", True)
return 'move'
log("[OK]\n", True)
return None
def backup(file: str | Path):
file = Path(file)
dest = file.parent / f"{file.name}.repear_backup"
if dest.exists(): return
try:
file.rename(dest)
return True
except OSError as e:
log(f"WARNING: Cannot backup `{file.name}': {e.strerror}\n")
return False
def delete(file: str | Path, may_fail=False):
file = Path(file)
if not file.exists(): return
try:
file.unlink()
return True
except OSError as e:
if not may_fail: log(f"ERROR: Cannot delete `{file.name}': {e.strerror}\n")
return False
class Logger:
@staticmethod
def write(s): log(s)
# path and file name sorting routines
re_digit = re.compile(r'(\d+)')
def tryint(s):
try: return int(s)
except ValueError: return s.lower()
def fnrep(fn): return tuple(map(tryint, re_digit.split(fn)))
def fncmp(a, b):
a_val = fnrep(a)
b_val = fnrep(b)
return (a_val > b_val) - (a_val < b_val)
def pathcmp(a, b):
a = a.split(u'/')
b = b.split(u'/')
# compare base directories
for i in range(min(len(a), len(b)) - 1):
r = fncmp(a[i], b[i])
if r: return r
# subdirectories first
r = len(b) - len(a)
if r: return r
# finally, compare leaf file name
return fncmp(a[-1], b[-1])
def trackcmp(a, b): return pathcmp(a.get('original path', None) or a.get('path', '???'), b.get('original path', None) or b.get('path', '???'))
class Allocator:
def __init__(self, root: Path | str, files_per_dir=100, max_dirs=100):
self.root = Path(root)
self.files_per_dir = files_per_dir
self.max_dirs = max_dirs
self.names = {}
self.files = {}
digits = []
try: dirs = self.root.iterdir()
except OSError:
self.root.mkdir()
dirs = []
for elem in dirs:
try: index = self.getindex(elem)
except ValueError: continue
self.names[index] = elem.name
self.files[index] = self.scandir(root / elem)
digits.append(len(elem.name) - 1)
if digits:
digits.sort()
self.fmt = "F%%0%dd" % (digits[len(digits) // 2])
else: self.fmt = "F%02d"
if not self.files: self.mkdir(0)
self.current_dir = min(self.files.keys())
def getindex(self, name):
if not name: raise ValueError
name = Path(name)
if name.name[0].upper() != 'F': raise ValueError
return int(name.name[1:], 10)
def scandir(self, root: str | Path):
try: dir_contents = Path(root).iterdir()
except OSError: return {}
dir_contents = [os.path.splitext(x.name)[0].upper() for x in dir_contents if x.name[0] != '.']
return dict(zip(dir_contents, [None] * len(dir_contents)))
def __len__(self): return sum(map(len, self.files.values()))
def __repr__(self): return "<Allocator: %d files in %d directories>" % (len(self), len(self.files))
def allocate_ex(self, index):
while True:
name = "".join([random.choice(string.ascii_uppercase) for x in range(4)])
if not(name in self.files[index]): break
self.files[index][name] = None
return self.names[index] + '/' + name
def mkdir(self, index):
if index in self.files: return
name = self.fmt % index
try: Path(self.root, name).mkdir()
except OSError: pass
self.names[index] = name
self.files[index] = {}
def allocate(self):
count, index = min([(len(d[1]), d[0]) for d in self.files.items()])
# need to allocate a new directory
if (count >= self.files_per_dir) and (len(self.files) < self.max_dirs):
available = [i for i in range(self.max_dirs) if not i in self.files]
index = available[0]
self.mkdir(index)
# generate a file name
while True:
name = "".join([random.choice(string.ascii_uppercase) for x in range(4)])
if not(name in self.files[index]): break
self.files[index][name] = None
return str(self.root) + '/' + self.names[index] + '/' + name
def add(self, fullname):
try:
dirname, filename = fullname.split('/')[-2:]
index = self.getindex(dirname)
except ValueError: return
filename = os.path.splitext(filename)[0]
if not index in self.files:
self.names[index] = dirname
self.files[index] = {}
self.files[index][filename] = None
class BalancedShuffle:
def __init__(self):
self.root: dict[None | bytes, list] = { None: [] }
def add(self, path, data):
if isinstance(path, str):
path = path.encode('ascii', 'replace')
path = path.replace(b"\\", b"/").lower().split(b"/")
if path and not path[0]:
path.pop(0)
if not path:
return # broken path
root = self.root
while True:
if len(path) == 1:
# tail reached
root[None].append(data)
break
component = path.pop(0)
if component not in root:
root[component] = { None: [] }
root = root[component]
def shuffle(self, root=None):
if not root:
root = self.root
# shuffle the files of the root node
random.shuffle(root[None])
# build a list of directories to shuffle
subdirs = list(filter(None, [root[None]] + [self.shuffle(root[key]) for key in root if key]))
# check for "tail" cases
if not subdirs: return []
if len(subdirs) == 1: return subdirs[0]
# pad subdirectory list to a common length
dircount = len(subdirs)
maxlen = max(map(len, subdirs))
subdirs = [self.fill(sd, maxlen) for sd in subdirs]
# collect all items
res = []
last = -1
for i in range(maxlen):
# determine the directory order for this "column"
order = list(range(dircount))
random.shuffle(order)
if (len(order) > 1) and (order[0] == last):
order.append(order.pop(0))
while len(order) > 1: # = if len(order) > 1: while True:
random.shuffle(order)
if last != order[0]: break
last = order[-1]
# produce a result
res.extend(filter(lambda x: x is not None, [subdirs[j][i] for j in order]))
return res
def fill(self, data, total):
ones = len(data)
invert = (ones > (total / 2))
if invert: ones = total - ones
bitmap = [0] * total
remain = total
for fraction in range(ones, 0, -1):
bitmap[total - remain] = 1
skip = float(remain) / fraction
skip = random.randrange(int(0.9 * skip), int(1.1 * skip) + 2)
remain -= min(max(1, skip), remain - fraction + 1)
if invert: bitmap = [1-x for x in bitmap]
offset = random.randrange(0, total)
bitmap = bitmap[offset:] + bitmap[:offset]
def decide(x):
if x: return data.pop(0)
return None
return map(decide, bitmap)
def ImportPlayCounts(cache, index):
log("Updating play counts and ratings ... ", True)
# open Play Counts file
try: pc = iTunesDB.PlayCountsReader()
except IOError:
log("\n0 track(s) updated.\n")
return False
except iTunesDB.InvalidFormat:
log("\n-- Error in Play Counts file, import failed.\n")
return False
# parse old iTunesDB
try:
db = iTunesDB.DatabaseReader()
files = [printable(item.get('path', u'??')[1:].replace(u':', u'/')).lower() for item in db]
db.f.close()
del db
except (IOError, iTunesDB.InvalidFormat):
log("\n-- Error in iTunesDB, import failed.\n")
return False
# plausability check
if len(files) != pc.entry_count:
log("\n-- Mismatch between iTunesDB and Play Counts file, import failed.\n")
return False
# walk through Play Counts file
update_count = 0
try:
for item in pc:
path = files[item.index]
try: track = cache[index[path]]
except (KeyError, IndexError): continue
updated = False
if item.play_count:
track['play count'] = track.get('play count', 0) + item.play_count
updated = True
if item.last_played:
track['last played time'] = item.last_played
updated = True
if item.skip_count:
track['skip count'] = track.get('skip count', 0) + item.skip_count
updated = True
if item.last_skipped:
track['last skipped time'] = item.last_skipped
updated = True
if item.bookmark:
track['bookmark time'] = item.bookmark * 0.001
updated = True
if item.rating:
track['rating'] = item.rating
updated = True
if updated:
update_count += 1
pc.f.close()
del pc
except (IOError, iTunesDB.InvalidFormat):
log("\n-- Error in Play Counts file, import failed.\n")
return False
log("%d track(s) updated.\n" % update_count)
return update_count
def Dissect():
state, cache = load_cache((None, None))
if (state is not None) and not(Options['force']):
if state == "frozen": confirm("\nWARNING: This action will put all the music files on your iPod into a completely new directory structure. All previous file and directory names will be lost. This also means that any iTunesDB backups you have will NOT work any longer!\n")
if state == "unfrozen": confirm("\nWARNING: The database is currently unfrozen, so the following operations will almost completely fail.\n")
cache = []
try:
db = iTunesDB.DatabaseReader()
for info in db:
if not info.get('path', None):
log("ERROR: track lacks path attribute\n")
continue
src = printable(info['path'])[1:].replace(":", "/")
if not os.path.isfile(src):
log("ERROR: file `%s' is found in database, but doesn't exist\n" % src)
continue
if not info.get('title', None): info.update(iTunesDB.GuessTitleAndArtist(info['path']))
ext = os.path.splitext(src)[1]
base = DISSECT_BASE_DIR
if info.get('artist', None):
base += printable(info['artist'], "<>/\\:|?*\"") + '/'
if info.get('album', None):
base += printable(info['album'], "<>/\\:|?*\"") + '/'
if info.get('track number', None):
base += "%02d - " % info['track number']
base += printable(info['title'], "<>/\\:|?*\"")
# move the file, but avoid filename collisions
serial = 1
dest = base + ext
while os.path.exists(dest):
serial += 1
dest = base + " (%d)"%serial + ext
log("%s => %s " % (src, dest), True)
if move_file(src, dest): continue # move failed
# create a placeholder cache entry
cache.append({'path': src, 'original path': dest})
except IOError: fatal("can't read iTunes database file")
except iTunesDB.InvalidFormat:
fatal("invalid iTunes database format")
raise
# clear the cache
save_cache(("unfrozen", cache))
g_freeze_error_count = 0
def check_file(base, fn):
if fn.startswith('.'): return None # skip dot-files and -directories
key, ext = [component.lower() for component in os.path.splitext(fn)]
fullname = base + fn
try: s = os.stat(fullname)
except OSError:
log("ERROR: directory entry `%s' is inaccessible\n" % fn)
return None
isfile = int(not(stat.S_ISDIR(s[stat.ST_MODE])))
if isfile and not(stat.S_ISREG(s[stat.ST_MODE])): return None # no directory and no normal file -> skip this crap
if not(isfile) and (fullname=="iPod_Control" or fullname=="iPod_Control/Music"): isfile = -1 # trick the sort algorithm to move iPC/Music to front
return (isfile, fnrep(fn), fullname, s, ext, key)
def make_cache_index(cache):
index = {}
for i in range(len(cache)):
for path in [cache[i][f] for f in ('path', 'original path') if f in cache[i]]:
key = printable(path).lower()
if key in index:
log("ERROR: `%s' is cached multiple times\n" % printable(path))
else:
index[key] = i
return index
def find_in_cache(cache, index, path, s):
i = index.get(printable(path).lower(), None)
if i is None:
return (False, None) # not found
info = cache[i]
# check size and modification time
if info.get('size', None) != s[stat.ST_SIZE]:
return (False, info) # mismatch
if not iTunesDB.compare_mtime(info.get('mtime', 0), s[stat.ST_MTIME]):
return (False, info) # mismatch
# all checks passed => correct file
return (True, info)
def move_music(src, dest, info):
global g_freeze_error_count
format = info.get('format', "mp3-cbr")
if format == "ogg":
src = printable(src)
dest = os.path.splitext(printable(dest))[0] + ".mp3"
with tempfile.NamedTemporaryFile(delete=True, suffix=".wav") as f:
# generate new source filename (replace .ogg by .mp3)
newsrc = info.get('original path', src)
if not isinstance(newsrc, str):
newsrc = str(newsrc, sys.getfilesystemencoding(), 'replace')
newsrc = u'.'.join(newsrc.split(u'.')[:-1]) + u'.mp3'
# decode the Ogg file
res = execute("oggdec", ["-Q", "-o", f.name, src])
if res != 0:
g_freeze_error_count += 1
log("[FAILED]\nERROR: cannot execute OggDec ... result '%s'\n" % res)
delete(f.name, may_fail=True)
return None
else:
log("[decoded] ", True)
# build LAME option list
lameopts = Options['lameopts'].split(' ')
for key, optn in (('title','tt'), ('artist','ta'), ('album','tl'), ('year','ty'), ('comment','tc'), ('track number','tn')):
if key in info:
lameopts.extend(["--"+optn, printable(info[key])])
if 'genre' in info:
ref_genre = printable(info['genre']).lower().replace(" ","")
for number, genre in mp3info.ID3v1Genres.items():
if genre.lower().replace(" ","") == ref_genre:
lameopts.extend(["--tg", str(number)])
break
# encode to MP3
res = execute("lame", lameopts + [f.name, dest])
delete(f.name)
if res != 0:
g_freeze_error_count += 1
log("[FAILED]\nERROR: cannot execute LAME ... result code %d\n" % res)
return None
else:
log("[encoded] ", True)
# check the resulting file
info = mp3info.GetAudioFileInfo(dest)
if not info:
g_freeze_error_count += 1
log("[FAILED]\nERROR: generated MP3 file is invalid\n")
delete(dest)
return None
delete(src)
info['original path'] = newsrc
info['changed'] = 2
log("[OK]\n", True)
return info
else: # no Ogg file -> move directly
if move_file(src, dest):
g_freeze_error_count += 1
return None # failed
else: return info
def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwork=None):
global g_freeze_error_count
try: flist = list(filter(None, [check_file(base, fn) for fn in os.listdir(base or ".")]))
except KeyboardInterrupt: raise
except:
g_freeze_error_count += 1
log(base + "/\n" + " runtime error, traceback follows ".center(79, '-') + "\n")
traceback.print_exc(file=Logger)
log(79*'-' + "\n")
return []
# generate directory list
directories = [i for i in flist if i[0] < 1]
directories.sort()
# add playlist files
playlists.extend([x[2] for x in flist if (x[0] > 0) and (x[4] == ".m3u")])
# generate music file list
music = [i for i in flist if (i[0] > 0) and (i[4] in SUPPORTED_FILE_FORMATS)]
music.sort()
# if there are no subdirs and no music files here, prune this directory
if not(directories) and not(music): return []
# generate name -> artwork file associations
image_assoc = dict([(x[5], x[2]) for x in flist if (x[0] > 0) and (x[4] in (".jpg", ".png"))])
# find artwork files that are not associated to a file or directory
unassoc_images = image_assoc.copy()
for d0,d1,d2,d3,d4,key in directories:
if key in unassoc_images: del unassoc_images[key]
for d0,d1,d2,d3,d4,key in music:
if key in unassoc_images: del unassoc_images[key]
unassoc_images = list(unassoc_images.values())
unassoc_images.sort()
# use one of the unassociated artwork files as this directory's artwork,
# unless the inherited artwork file name is already a perfect match (i.e.
# the directory name and the artwork name are identical)
if unassoc_images and (not(artwork) or not(artwork.lower().startswith(base[:-1].lower()))): artwork = find_good_artwork(unassoc_images, base)
# now that the artwork problem is solved, we start processing:
# recurse into subdirectories first
res = []
for isfile, dummy, fullname, s, ext, key in directories: res.extend(freeze_dir(cache, index, allocator, playlists, fullname + '/', artwork))
# now process the local files
locals = []
unique_artist = None
unique_album = None
for isfile, dummy, fullname, s, ext, key in music:
try:
# we don't need to move this file if it's already in the Music directory
already_there = fullname.startswith(MUSIC_DIR)
# is this track cached?
log(fullname + ' ', True)
valid, info = find_in_cache(cache, index, fullname, s)
if valid:
assert info
info['changed'] = 0 # what?
log("[cached] ", True)
else:
if info:
# cache entry present, but invalid => save iPod_Control location
path = info['path']
changed = 1
else:
path = fullname
changed = 2
info = mp3info.GetAudioFileInfo(fullname)
assert info
iTunesDB.FillMissingTitleAndArtist(info)
info['changed'] = changed
if not already_there:
if isinstance(info['path'], str): info['original path'] = info['path']
else: info['original path'] = str(info['path'], sys.getfilesystemencoding(), 'replace')
info['path'] = path
# move the track to where it belongs
if not already_there:
path = info.get('path', None)
if not(path) or os.path.exists(path) or not(os.path.isdir(os.path.split(path)[0])):
# if anything is wrong with the path, generate a new one
path = allocator.allocate() + ext
else: allocator.add(path)
info['path'] = path
info = move_music(fullname, path, info)
if not info: continue # something failed
else:
allocator.add(fullname)
log("[OK]\n", True)
# associate artwork to the track
info['artwork'] = image_assoc.get(key, artwork)
# check for unique artist and album
check = info.get('artist', None)
if not locals: unique_artist = check
elif check != unique_artist: unique_artist = False
check = info.get('album', None)
if not locals: unique_album = check
elif check != unique_album: unique_album = False
# finally, append the track to the track list
locals.append(info)
except KeyboardInterrupt:
log("\nInterrupted by user.\nContinue with next file or abort? [c/A] ")
try: answer = input()
except (IOError, EOFError, KeyboardInterrupt): answer = ""
if not answer.lower().startswith("c"): raise
except:
g_freeze_error_count += 1
log("\n" + " runtime error, traceback follows ".center(79, '-') + "\n")
traceback.print_exc(file=Logger)
log(79*'-' + "\n")
# if all files in this directory share the same album title, but differ
# in the artist name, we assume it's a compilation
if unique_album and not(unique_artist):
for info in locals: info['compilation'] = 1
# combine the lists and return them
res.extend(locals)
return res
def cmp_lst(a, b, order, empty_pos):
a = max(a.get('last played time', 0), a.get('last skipped time', 0))
b = max(b.get('last played time', 0), b.get('last skipped time', 0))
if not a:
if not b: return 0
return empty_pos
else:
if not b: return -empty_pos
return order * cmp(a, b)
def cmp_path(a, b, order, empty_pos): return order * trackcmp(a, b)
class cmp_key:
def __init__(self, key): self.key = key
def __repr__(self): return "%s(%s)" % (self.__class__.__name__, repr(self.key))
def __call__(self, a, b, order, empty_pos):
if self.key in a:
if self.key in b:
a = a[self.key]
if isinstance(a, (str, bytes)): a = a.lower()
b = b[self.key]
if isinstance(b, (str, bytes)): b = b.lower()
return order * cmp(a, b)
else: return -empty_pos
else:
if self.key in b: return empty_pos
else: return 0
sort_criteria = {
'playcount': lambda a,b,o,e: o*cmp(a.get('play count', 0), b.get('play count', 0)),
'skipcount': lambda a,b,o,e: o*cmp(a.get('skip count', 0), b.get('skip count', 0)),
'startcount': lambda a,b,o,e: o*cmp(a.get('play count', 0) + a.get('skip count', 0), b.get('play count', 0) + b.get('skip count', 0)),
'artworkcount': lambda a,b,o,e: o*cmp(a.get('artwork count', 0), b.get('artwork count', 0)),
'laststartedtime': cmp_lst,
'laststarttime': cmp_lst,
'lastplaytime': 'last played time',
'lastskiptime': 'last skipped time',
'movie': 'movie flag',
'filesize': 'size',
'path': cmp_path,
}
for nc in ('title', 'artist', 'album', 'compilation', 'rating', 'path', 'length', 'size', 'track number', 'year', 'bitrate', 'sample rate', 'volume', \
'last played time', 'last skipped time', 'mtime', 'disc number', 'total discs', 'BPM', 'movie flag'):
sort_criteria[nc.replace(' ', '').lower()] = nc
re_sortspec = re.compile(r'^([<>+-]*)(.*?)([<>+-]*)$')
class SSParseError(Exception): pass
class SortSpec:
def __init__(self, pattern=None):
if pattern:
self.parse(pattern)
else:
self.criteria = []
def parse(self, pattern):
self.criteria = filter(None, map(self._parse_criterion, pattern.split(',')))
def _parse_criterion(self, text):
text = text.strip()
if not text: return None
m = re_sortspec.match(text)
if not m:
raise SSParseError("invalid sort criterion `%s'" % text)
text = m.group(2).strip()
key = text.lower().replace('_', '').replace(' ', '')
try:
criterion = sort_criteria[key]
except KeyError:
raise SSParseError("unknown sort criterion `%s'" % text)
if isinstance(criterion, bytes):
criterion = cmp_key(criterion)
modifiers = m.group(1) + m.group(3)
order = 1
if '-' in modifiers: order = -1
empty_pos = -1
if '<' in modifiers: empty_pos = 1
return (criterion, order, empty_pos)
def _cmp(self, a, b):
for cmp_func, order, empty_pos in self.criteria:
res = cmp_func(self.tracks[a], self.tracks[b], order, empty_pos)
if res: return res
return cmp(a, b)
def sort(self, tracks):
self.tracks = tracks
index = list(range(len(self.tracks)))
index.sort(key=cmp_to_key(self._cmp))
del self.tracks
return [tracks[i] for i in index]
def __add__(self, other):
self.criteria += other.criteria
return self
def __len__(self):
return len(self.criteria)
def add_scripted_playlist(db, tracklist, list_name, include, exclude, shuffle=False, changemask=0, sort=None):
if not(list_name) or not(include or changemask) or not(tracklist):
return
tracks = []
log("Processing playlist `%s': " % (list_name), True)
for track in tracklist:
if not 'original path' in track: continue # we don't know the real name of this file, so skip it
name = track['original path'].encode(sys.getfilesystemencoding(), 'replace').lower()
ok = changemask & track.get('changed', 0)
for pattern in include:
if fnmatch.fnmatch(name, pattern):
ok = True
break
for pattern in exclude:
if fnmatch.fnmatch(name, pattern):
ok = False
break
if ok: tracks.append(track)
log(f"{len(tracks)} tracks\n")
if not tracks: return
if shuffle == 1:
shuffle = BalancedShuffle()
for info in tracks: shuffle.add(info.get('original path', None) or info.get('path', "???"), info)
tracks = shuffle.shuffle()
if shuffle == 2: random.shuffle(tracks)
if sort: tracks = sort.sort(tracks)
db.add_playlist(tracks, list_name)
def process_m3u(db, tracklist, index, filename, skip_album_playlists):
if not(filename) or not(tracklist): return
basedir, list_name = os.path.split(filename)
list_name = str(os.path.splitext(list_name)[0], sys.getfilesystemencoding(), 'replace')
log("Processing playlist `%s': " % (list_name), True)
try: f = open(filename, "r")
except IOError as e: log("ERROR: cannot open `%s': %s\n" % (filename, e.strerror))
tracks = []
# collect all tracks
for line in f:
line = line.strip()
if line.startswith('#'): continue # comment or EXTM3U line
line = os.path.normpath(os.path.join(basedir, line)).replace("\\", "/").lower()
try: tracks.append(tracklist[index[line]])
except KeyError: continue # file not found -> sad, but not fatal
f.close()
# check if it's an album playlist
if skip_album_playlists:
ref_album = None
ok = True # "we don't know enough about this playlist, so be optimistic"
for info in tracks:
if not 'album' in info: continue
if not ref_album:
ref_album = info['album']
elif info['album'] != ref_album:
ok = True # "this playlist is mixed-album, so it's clean"
break
else: ok = False # "all known tracks are from the same album, how sad"
if not ok:
# now check if this playlist really covers the _whole_ album
ok = len(tracks)
for info in tracklist:
try:
if info.get('album', None) == ref_album:
ok -= 1
if ok < 0: break
except (TypeError, UnicodeDecodeError):
# old (<0.3.0) cache files contain non-unicode information
# for ID3v1 tags which can cause trouble here, so ...
continue
if not(ok) :
log("album playlist, discarding.\n")
return
# finish everything
log("%d tracks\n" % len(tracks))
if not tracks: return
db.add_playlist(tracks, list_name)
def make_directory_playlists(db, tracklist):
log("Processing directory playlists ...\n")
dirs = {}
for track in tracklist:
path = track.get('original path', None)
if not path: continue
for dir in path.split('/')[:-1]:
if not dir: continue
if dir in dirs:
dirs[dir].append(track)
else:
dirs[dir] = [track]
dirlist = list(dirs.keys())
dirlist.sort(key=cmp_to_key(fncmp))
for dir in dirlist:
log("Processing playlist `%s': " % dir, True)
tracks = dirs[dir]
tracks.sort(trackcmp)
log("%d tracks\n" % len(tracks))
db.add_playlist(tracks, dir)
shuffle_options = {"0": 0, "no": 0, "off": 0, "false": 0, "disabled": 0, "none": 0,
"1": 1, "yes": 1, "on": 1, "true": 0, "enabled": 1, "balanced": 1,
"2": 2, "random": 2, "standard": 2
}
def parse_master_playlist_file():
# helper function
def yesno(s):
if s.lower() in ('true', 'enable', 'enabled', 'yes', 'y'): return 1
try: return (int(s) != 0)
except ValueError: return 0
# default values
skip_album_playlists = True
directory_playlists = False
lists = []
# now we're parsing
try: f = open(MASTER_PLAYLIST_FILE, "r")
except IOError: return (skip_album_playlists, directory_playlists, lists)
include = []
exclude = []
list_name = None
shuffle = 0
changemask = 0
sort = SortSpec()
lineno = 0
for line in f:
lineno += 1
line = line.split(';', 1)[0].strip()
if not line: continue
if (line[0] == '[') and (line[-1] == ']'):
if list_name and (include or changemask): lists.append((list_name, include, exclude, shuffle, changemask, sort))
include = []
exclude = []
list_name = line[1:-1]
shuffle = False
changemask = 0
sort = SortSpec()
continue
try: key, value = [x.strip().replace("\\", "/") for x in line.split('=')]
except ValueError: continue
key = key.lower().replace(' ', '_')
if not value:
log("WARNING: In %s:%d: key `%s' without a value\n" % (MASTER_PLAYLIST_FILE, lineno, key))
continue
if key == "skip_album_playlists":
if list_name: log("WARNING: In %s:%d: global option `%s' inside a playlist\n" % (MASTER_PLAYLIST_FILE, lineno, key))
skip_album_playlists = yesno(value)
elif key == "directory_playlists":
if list_name: log("WARNING: In %s:%d: global option `%s' inside a playlist\n" % (MASTER_PLAYLIST_FILE, lineno, key))
directory_playlists = yesno(value)
elif key == "shuffle":
try: shuffle = shuffle_options[value.lower()]
except KeyError: log("WARNING: In %s:%d: invalid value `%s' for shuffle option\n" % (MASTER_PLAYLIST_FILE, lineno, value))
elif key == "new": changemask = (changemask & (~2)) | (yesno(value) << 1)
elif key == "changed": changemask = (changemask & (~1)) | yesno(value)
elif key == "sort":
try: sort = SortSpec(value) + sort
except SSParseError as e: log("WARNING: In %s:%d: %s\n" % (MASTER_PLAYLIST_FILE, lineno, e))
elif key in ("include", "exclude"):
if value[0] == "/": value = value[1:]
if os.path.isdir(value):
if value[-1] != "/": value += "/"
value += "*"
if key == "include": include.append(value.lower())
else: exclude.append(value.lower())
else: log("WARNING: In %s:%d: unknown key `%s'\n" % (MASTER_PLAYLIST_FILE, lineno, key))
f.close()
if list_name and (include or changemask): lists.append((list_name, include, exclude, shuffle, changemask, sort))
return (skip_album_playlists, directory_playlists, lists)
re_cover = re.compile(r'[^a-z]cover[^a-z]')
re_front = re.compile(r'[^a-z]front[^a-z]')
def find_good_artwork(files, base):
if not files: return None # sorry, no files here
_, basename = os.path.split(base)
if not basename: _, basename = os.path.split(base)
basename = basename.strip().lower()
candidates = []
for name in files:
ref = os.path.splitext(name)[0].strip().lower()
# if the file has the same name as the directory, we'll use that directly
if ref == basename: return name
ref = "|%s|" % ref
score = 0
if re_cover.search(ref):
# if the name contains the word "cover", it's a good candidate
score = -1
if re_front.search(ref):
# if the name contains the word "front", that's even better
score = -2
candidates.append((score, name.lower(), name))
candidates.sort()
return candidates[0][2] # return the candidate with the best score
def GenerateArtwork(model, tracklist):
# step 0: check PIL availability
if not iTunesDB.PILAvailable:
log("ERROR: Python Imaging Library (PIL) isn't installed, Artwork is disabled.\n")
log(" Visit http://www.pythonware.com/products/pil/ to get PIL.\n")
return
# step 1: generate an artwork list
artwork_list = {}
for track in tracklist:
artwork = track.get('artwork', None)
if not artwork: continue # no artwork file
dbid = track.get('dbid', None)
if not dbid: continue # artwork doesn't make sense without a dbid
if artwork in artwork_list:
artwork_list[artwork].append(dbid)
else:
artwork_list[artwork] = [dbid]
# step 2: generate the artwork directory (if it doesn't exist already)
try:
os.mkdir(ARTWORK_DIR[:-1])
except OSError:
pass # not critical (yet)
# step 3: try to load the artwork cache
try:
with open(ARTWORK_CACHE_FILE, "rb") as f: old_cache = pickle.load(f)
except (IOError, EOFError, pickle.PickleError):
old_cache = ({}, {})
# step 4: generate and save the ArtworkDB
artwork_db, new_cache, dbid2mhii = iTunesDB.ArtworkDB(model, artwork_list, cache_data=old_cache)
backup(ARTWORK_DB_FILE)
try:
with open(ARTWORK_DB_FILE, "w") as f: f.write(artwork_db)
except IOError as e:
log("FAILED: %s\n" % e.strerror +
"ERROR: The ArtworkDB file could not be written. This means that the iPod will\n" +
"not show any artwork items.\n")
# step 5: save the artwork cache
try:
with open(ARTWORK_CACHE_FILE, "wb") as f: pickle.dump(new_cache, f)
except (IOError, EOFError, pickle.PickleError): log("ERROR: can't save the artwork cache\n")
# step 6: update the 'mhii link' field
for track in tracklist:
dbid = track.get('dbid', None)
mhii = dbid2mhii.get(dbid, None)
if mhii: track['mhii link'] = mhii
elif 'mhii link' in track: del track['mhii link']
################################################################################
## FREEZE and UPDATE action ##
################################################################################
def Freeze(CacheInfo=None, UpdateOnly=False):
global g_freeze_error_count
if not CacheInfo: CacheInfo = load_cache((None, []))
state, cache = CacheInfo
if UpdateOnly:
if (state != "frozen") and not(Options['force']): confirm("\nNOTE: The database is not frozen, the update will not work as expected!\n")
else:
if (state == "frozen") and not(Options['force']): confirm("\nNOTE: The database is already frozen.\n")
state = "frozen"
# allocate the filename allocator
if not UpdateOnly:
log("Scanning for present files ...\n", True)
try: allocator = Allocator(MUSIC_DIR[:-1])
except (IOError, OSError):
log("FATAL: can't read or write the music directory!\n")
return
# parse the master playlist setup file
skip_album_playlists, directory_playlists, master_playlists = parse_master_playlist_file()
# index the track cache
log("Indexing track cache ...\n", True)
index = make_cache_index(cache)
# now go for the real thing
playlists = []
if not UpdateOnly:
log("Searching for playable files ...\n", True)
tracklist = freeze_dir(cache, index, allocator, playlists)
log("Scan complete: %d tracks found, %d error(s).\n" % (len(tracklist), g_freeze_error_count))
# cache save checkpoint
save_cache((state, tracklist))
else:
# in update mode, use the cached track list directly
tracklist = cache
# artwork processing
if not UpdateOnly:
model = Options['model']
if not model:
try:
with open(MODEL_FILE, "r") as f: model = f.read().strip()[:10].lower()
log("\nLoaded model name `%s' from the cache.\n" % model)
except IOError: pass
if model:
model = model.strip().lower()
if not(model in iTunesDB.ImageFormats): log("\nWARNING: model `%s' unrecognized, skipping Artwork generation.\n" % model)
else:
try:
with open(MODEL_FILE, "w") as f: f.write(model)
except IOError: pass
else: log("\nNo model specified, skipping Artwork generation.\n")
else: model = None
# generate track IDs
if not UpdateOnly: iTunesDB.GenerateIDs(tracklist)
# generate the artwork list
if model and not(UpdateOnly):
log("\nProcessing Artwork ...\n", True)
GenerateArtwork(model, tracklist)
# build the database
log("\nCreating iTunesDB ...\n", True)
db = iTunesDB.iTunesDB(tracklist, name="%s %s"%(__title__, __version__))
# save the tracklist as the cache for the next run
save_cache((state, tracklist))
# add playlists according to the master playlist file
for listspec in master_playlists: add_scripted_playlist(db, tracklist, *listspec)
# process all m3u playlists
if playlists:
log("Updating track index ...\n", True)
index = make_cache_index(tracklist)
for plist in playlists: process_m3u(db, tracklist, index, plist, skip_album_playlists)
# create directory playlists
if directory_playlists: make_directory_playlists(db, tracklist)
# finish iTunesDB and apply hash stuff
log("Finalizing iTunesDB ...\n")
db = db.finish()
fwids = hash58.GetFWIDs()
try:
with open(FWID_FILE, "r") as f: fwid = f.read().strip().upper()
if len(fwid) != 16: fwid = None
except IOError: fwid = None
store_fwid = False
if fwid:
# preferred FWID stored on iPod
if fwids and not(fwid in fwids): log("WARNING: Stored serial number doesn't match any connected iPod!\n")
else:
# auto-detect FWID
if fwids:
fwid = fwids[0]
store_fwid = (len(fwids) == 1)
if not store_fwid:
log("WARNING: Multiple iPods are connected. If the iPod you are trying to freeze is\n" +
" a recent model, it might not play anything. Please try again with the\n" +
" other iPod unplugged.\n")
else:
log("WARNING: Could not determine your iPod's serial number. If it's a recent model,\n" +
" it will likely not play anything!\n")
if fwid:
db = hash58.UpdateHash(db, fwid)
if store_fwid and fwid:
try:
with open(FWID_FILE, "w") as f: f.write(fwid)
except IOError: pass
# write iTunesDB
write_ok = True
backup(DB_FILE)
try:
with open(DB_FILE, "wb") as f:
f.write(db)
except IOError as e:
write_ok = False
log("FAILED: %s\n" % e.strerror +
"ERROR: The iTunesDB file could not be written. This means that the iPod will\n" +
"not play anything.\n")
# generate statistics
if write_ok:
log("\nYou can now unmount the iPod and listen to your music.\n")
sec = int(sum([track.get('length', 0.0) for track in tracklist]) + 0.5)
log("There are %d tracks (%d:%02d:%02d" % (len(tracklist), sec/3600, (sec/60)%60, sec%60))
if sec > 86400: log(" = %.1f days" % (sec / 86400.0))
log(") waiting for you to be heard.\n")
# finally, save the tracklist as the cache for the next run
save_cache((state, tracklist))
def Unfreeze(CacheInfo=None):
if not CacheInfo: CacheInfo = load_cache((None, None))
state, cache = CacheInfo
if not state or cache is None:
fatal("can't unfreeze: rePear cache is missing or broken")
return
if state != "frozen" and not Options['force']: confirm("\nNOTE: The database is already unfrozen.\n")
log("Moving tracks back to their original locations ...\n")
success = failed = 0
for info in cache: # type: ignore
src = printable(info.get('path', ""))
dest = printable(info.get('original path', ""))
if not src:
log("ERROR: track lacks path attribute\n")
continue
if not dest: continue # no original path
log("%s " % dest)
if move_file(src, dest): failed += 1
else: success += 1
log(f"Operation complete: {len(cache)} tracks total, {success} moved back, {failed} failed.\n")
log("\nYou can now manage the music files on your iPod.\n")
save_cache(("unfrozen", cache))
def ConfigFWID():
log("Determining serial number (FWID) of attached iPods ...\n")
fwids = hash58.GetFWIDs()
try:
with open(FWID_FILE, "r") as f: fwid = f.read().strip().upper()
if len(fwid) != 16: fwid = None
except IOError: fwid = None
if not fwids:
# no FWIDs detected
if fwid: return log(f"No iPod detected, but FWID is already set up ({fwid}).\n\n")
else: return log("No iPod detected, can't determine FWID.\n\n")
if len(fwids) > 1:
# multiple FWIDs detected
if fwid and (fwid in fwids):
return log(f"Multiple iPods detected, but FWID is already set up ({fwid}).\n\n")
else:
return log("Multiple iPods detected, can't determine FWID.\n" + \
"Please unplug all iPods except the one you're configuring\n\n")
# exactly one FWID detected
log("Serial number detected: %s\n" % fwids[0])
if fwid and (fwid != fwids[0]):
log("Warning: This serial number is different from the one that has been stored on\n" + \
" the iPod (%s). Storing the new FWID anyway.\n" % fwid)
fwid = fwids[0]
if not fwid: return log("\n")
try:
with open(FWID_FILE, "w") as f: f.write(fwid)
log("FWID saved.\n\n")
except IOError: log("Error saving the FWID.\n\n")
models = [
[None, "other/unspecified (no cover artwork)"],
['photo', '4g', "iPod photo (4G)"],
['video', '5g', "iPod video (5G)"],
['classic', '6g', "iPod classic (6G)"],
['nano', 'nano1g', 'nano2g', "iPod nano (1G/2G)"],
['nano3g', "iPod nano (3G, \"fat nano\")"],
['nano4g', "iPod nano (4G)"],
]
def is_model_ok(mod_id):
for m in models[1:]:
if mod_id in m[:-1]: return True
return False
def ConfigModel():
try:
with open(MODEL_FILE, "r") as f: model = f.read().strip().lower()
if not is_model_ok(model): model = None
except IOError: model = None
print("Select iPod model:")
default = 0
for i in range(len(models)):
if model in models[i][:-1]:
default = i
c = "*"
else: c = " "
print(c, "%d." % i, models[i][-1])
try: answer = int(input("Which model is this iPod? [0-%d, default %d] => " % (len(models) - 1, default)))
except (IOError, EOFError, KeyboardInterrupt, ValueError): answer = default
if (answer < 0) or (answer >= len(models)): answer = default
if answer and models[answer] and models[answer][0] is not None:
try:
with open(MODEL_FILE, "w") as f: f.write(models[answer][0])
log("Model set to `%s'.\n\n" % models[answer][-1])
except IOError: log("Error: cannot set model.\n\n")
else:
delete(MODEL_FILE, True)
log("Model set to `other'.\n\n")
re_ini_key = re.compile(r'^[ \t]*(;)?[ \t]*(\w+)[ \t]*=[ \t]*(.*?)[ \t]*$', re.M)
class INIKey:
def __init__(self, key, value):
self.key = key
self.value = value
self.present = False
self.valid = False
def check(self, m):
if not m: return
if m.group(2).lower() != self.key: return
self.present = True
valid = not(not(m.group(1)))
if not(valid) and self.valid: return
self.start = m.start(3)
self.end = m.end(3)
self.comment = m.start(1)
def apply(self, s):
if not self.present:
if not s.endswith("\n"): s += "\n"
return s + "%s = %s\n" % (self.key, self.value)
s = s[:self.start] + self.value + s[self.end:]
if not(self.valid) and (self.comment >= 0): s = s[:self.comment] + s[self.comment+1:]
return s
def ConfigAll():
ConfigFWID()
ConfigModel()
def Auto():
state, cache = load_cache((None, []))
if state == 'frozen': Unfreeze((state, cache))
else: Freeze((state, cache))
def Reset():
state, cache = load_cache((None, []))
if (state == 'frozen') and not(Options['force']):
confirm("\nWARNING: The database is currently frozen. If you reset the cache now, you will\n\tlose all file name information. This cannot be undone!\n")
return
try: os.remove(CACHE_FILE)
except OSError:
try: save_cache((None, []))
except IOError: pass
delete(ARTWORK_CACHE_FILE, True)
log("\nCache reset.\n")
################################################################################
## the main function ##
################################################################################
class OptionParser(optparse.OptionParser):
def format_help(self, formatter=None):
models = list(iTunesDB.ImageFormats.keys())
models.sort()
return optparse.OptionParser.format_help(self, formatter) + """
Artwork is supported on the following models:
""" + ", ".join(models) + """
actions:
help show this help message and exit
freeze move all music files into the iPod's library
unfreeze move music files back to their original location
update update the frozen database without scanning for new files
dissect generate an Artist/Album/Title directory structure
reset clear rePear's metadata cache
cfg-fwid determine the iPod's serial number and save it
cfg-model interactively configure the iPod model
config run all of the configuration steps
If no action is specified, rePear automatically determines which of the
`freeze' or `unfreeze' actions should be taken.
"""
if __name__ == "__main__":
parser = OptionParser(version=__version__, usage="%prog [options] [<action>]")
parser.add_option("-r", "--root", action="store", default=None, metavar="PATH", help="set the iPod's root directory path")
parser.add_option("-l", "--log", action="store", default="repear.log", metavar="FILE", help="set the output log file path")
parser.add_option("-m", "--model", action="store", default=None, metavar="MODEL", help="specify the iPod model (REQUIRED for artwork support)")
parser.add_option("-L", "--lameopts", action="store", default=DEFAULT_LAME_OPTS, metavar="CMDLINE", help="set the LAME encoder options (default: %s)" % DEFAULT_LAME_OPTS)
parser.add_option("-f", "--force", action="store_true", default=False, help="skip confirmation prompts for dangerous actions")
parser.add_option("-p", "--playlist", action="store", default=None, metavar="FILE", help="specify playlist config file")
(opts, args) = parser.parse_args()
Options = opts.__dict__
if len(args) > 1: parser.error("too many arguments")
if args: action = args[0].strip().lower()
else: action = "auto"
if action == "help":
parser.print_help()
sys.exit(0)
if not action in (
'auto', 'freeze', 'unfreeze', 'update', 'dissect', 'reset', \
'config', 'cfg-fwid', 'cfg-model'
): parser.error("invalid action `%s'" % action)
oldcwd = os.getcwd()
open_log()
log(f"{banner}\n{len(banner)*'-'}\n\n")
if not logfile: log(f"WARNING: can't open log file `{Options['log']}', logging disabled\n\n")
goto_root_dir()
if Options['playlist']:
first = Options['playlist'].replace("\\", "/").split('/', 1)[0]
if first in (".", ".."): MASTER_PLAYLIST_FILE = os.path.normpath(os.path.join(oldcwd, Options['playlist']))
else: MASTER_PLAYLIST_FILE = Options['playlist']
log("master playlist file is `%s'\n" % MASTER_PLAYLIST_FILE)
log("\n")
try:
if action=="auto": Auto()
elif action=="freeze": Freeze()
elif action=="unfreeze": Unfreeze()
elif action=="update": Freeze(UpdateOnly=True)
elif action=="dissect": Dissect()
elif action=="reset": Reset()
elif action=="config": ConfigAll()
elif action=="cfg-fwid": ConfigFWID()
elif action=="cfg-model": ConfigModel()
else: log("Unknown action, don't know what to do.\n")
code = 0
except SystemExit as e:
sys.exit(e.code)
except KeyboardInterrupt:
log("\n" + 79*'-' + "\n\nAction aborted by user.\n")
code = 2
except:
log("\n" + 79*'-' + "\n\nOOPS -- rePear crashed!\n\n")
traceback.print_exc(file=Logger)
log("\nPlease inform the author of rePear about this crash by sending the\nrepear.log file.\n")
code = 1
quit(code)