From 93dbcada7d8c76699bc81b7da14cadc9120fa5c8 Mon Sep 17 00:00:00 2001 From: KubaPro010 Date: Mon, 1 Dec 2025 15:59:09 +0100 Subject: [PATCH] fix a lot of errors (pylance) --- iTunesDB.py | 4 +- repear.py | 583 +++++++++++++++++----------------------------------- 2 files changed, 193 insertions(+), 394 deletions(-) diff --git a/iTunesDB.py b/iTunesDB.py index 982b573..f356012 100644 --- a/iTunesDB.py +++ b/iTunesDB.py @@ -615,7 +615,7 @@ class ImageItemRecord(Record): def ArtworkDB(model, imagelist, base_id=0x40, cache_data=({}, {})): while isinstance(ImageFormats.get(model, None), str): model = ImageFormats[model] - if not model in ImageFormats: return None + if not model in ImageFormats: raise Exception format_cache, image_cache = cache_data formats = [] @@ -813,7 +813,7 @@ class DatabaseReader: return hh + self.f.read(size - 8) def __iter__(self): return self - def next(self): + def __next__(self): try: header = self._skip_header(Tags.IT) except (IOError, InvalidFormat): raise StopIteration data_size = struct.unpack(' b) - (a < b) # Python3 replacement for cmp() + import iTunesDB, mp3info, hash58 Options = {} @@ -335,14 +341,11 @@ class BalancedShuffle: random.shuffle(root[None]) # build a list of directories to shuffle - subdirs = filter(None, [root[None]] + \ - [self.shuffle(root[key]) for key in root if key]) + subdirs = list(filter(None, [root[None]] + [self.shuffle(root[key]) for key in root if key])) # check for "tail" cases - if not subdirs: - return [] - if len(subdirs) == 1: - return subdirs[0] + if not subdirs: return [] + if len(subdirs) == 1: return subdirs[0] # pad subdirectory list to a common length dircount = len(subdirs) @@ -354,7 +357,7 @@ class BalancedShuffle: last = -1 for i in range(maxlen): # determine the directory order for this "column" - order = range(dircount) + order = list(range(dircount)) random.shuffle(order) if (len(order) > 1) and (order[0] == last): order.append(order.pop(0)) @@ -364,15 +367,13 @@ class BalancedShuffle: last = order[-1] # produce a result - res.extend(filter(lambda x: x is not None, \ - [subdirs[j][i] for j in order])) + res.extend(filter(lambda x: x is not None, [subdirs[j][i] for j in order])) return res def fill(self, data, total): ones = len(data) invert = (ones > (total / 2)) - if invert: - ones = total - ones + if invert: ones = total - ones bitmap = [0] * total remain = total for fraction in range(ones, 0, -1): @@ -380,8 +381,7 @@ class BalancedShuffle: skip = float(remain) / fraction skip = random.randrange(int(0.9 * skip), int(1.1 * skip) + 2) remain -= min(max(1, skip), remain - fraction + 1) - if invert: - bitmap = [1-x for x in bitmap] + if invert: bitmap = [1-x for x in bitmap] offset = random.randrange(0, total) bitmap = bitmap[offset:] + bitmap[:offset] def decide(x): @@ -394,8 +394,7 @@ def ImportPlayCounts(cache, index): log("Updating play counts and ratings ... ", True) # open Play Counts file - try: - pc = iTunesDB.PlayCountsReader() + try: pc = iTunesDB.PlayCountsReader() except IOError: log("\n0 track(s) updated.\n") return False @@ -423,10 +422,8 @@ def ImportPlayCounts(cache, index): try: for item in pc: path = files[item.index] - try: - track = cache[index[path]] - except (KeyError, IndexError): - continue + try: track = cache[index[path]] + except (KeyError, IndexError): continue updated = False if item.play_count: track['play count'] = track.get('play count', 0) + item.play_count @@ -456,24 +453,12 @@ def ImportPlayCounts(cache, index): log("%d track(s) updated.\n" % update_count) return update_count - -################################################################################ -## DISSECT action ## -################################################################################ - def Dissect(): state, cache = load_cache((None, None)) if (state is not None) and not(Options['force']): - if state=="frozen": confirm(""" -WARNING: This action will put all the music files on your iPod into a completely -new directory structure. All previous file and directory names will be lost. -This also means that any iTunesDB backups you have will NOT work any longer! -""") - if state=="unfrozen": confirm(""" -WARNING: The database is currently unfrozen, so the following operations will -almost completely fail. -""") + if state == "frozen": confirm("\nWARNING: This action will put all the music files on your iPod into a completely new directory structure. All previous file and directory names will be lost. This also means that any iTunesDB backups you have will NOT work any longer!\n") + if state == "unfrozen": confirm("\nWARNING: The database is currently unfrozen, so the following operations will almost completely fail.\n") cache = [] try: @@ -487,8 +472,7 @@ almost completely fail. if not os.path.isfile(src): log("ERROR: file `%s' is found in database, but doesn't exist\n" % src) continue - if not info.get('title', None): - info.update(iTunesDB.GuessTitleAndArtist(info['path'])) + if not info.get('title', None): info.update(iTunesDB.GuessTitleAndArtist(info['path'])) ext = os.path.splitext(src)[1] base = DISSECT_BASE_DIR if info.get('artist', None): @@ -506,46 +490,31 @@ almost completely fail. serial += 1 dest = base + " (%d)"%serial + ext log("%s => %s " % (src, dest), True) - if move_file(src, dest): - continue # move failed + if move_file(src, dest): continue # move failed # create a placeholder cache entry - cache.append({ - 'path': src, - 'original path': str(dest, sys.getfilesystemencoding(), 'replace') - }) - except IOError: - fatal("can't read iTunes database file") + cache.append({'path': src, 'original path': dest}) + except IOError: fatal("can't read iTunes database file") except iTunesDB.InvalidFormat: - raise fatal("invalid iTunes database format") + raise # clear the cache save_cache(("unfrozen", cache)) - - -################################################################################ -## FREEZE utilities ## -################################################################################ - g_freeze_error_count = 0 def check_file(base, fn): - if fn.startswith('.'): - return None # skip dot-files and -directories + if fn.startswith('.'): return None # skip dot-files and -directories key, ext = [component.lower() for component in os.path.splitext(fn)] fullname = base + fn - try: - s = os.stat(fullname) + try: s = os.stat(fullname) except OSError: log("ERROR: directory entry `%s' is inaccessible\n" % fn) return None isfile = int(not(stat.S_ISDIR(s[stat.ST_MODE]))) - if isfile and not(stat.S_ISREG(s[stat.ST_MODE])): - return None # no directory and no normal file -> skip this crap - if not(isfile) and (fullname=="iPod_Control" or fullname=="iPod_Control/Music"): - isfile = -1 # trick the sort algorithm to move iPC/Music to front + if isfile and not(stat.S_ISREG(s[stat.ST_MODE])): return None # no directory and no normal file -> skip this crap + if not(isfile) and (fullname=="iPod_Control" or fullname=="iPod_Control/Music"): isfile = -1 # trick the sort algorithm to move iPC/Music to front return (isfile, fnrep(fn), fullname, s, ext, key) @@ -640,16 +609,13 @@ def move_music(src, dest, info): if move_file(src, dest): g_freeze_error_count += 1 return None # failed - else: - return info + else: return info def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwork=None): global g_freeze_error_count - try: - flist = list(filter(None, [check_file(base, fn) for fn in os.listdir(base or ".")])) - except KeyboardInterrupt: - raise + try: flist = list(filter(None, [check_file(base, fn) for fn in os.listdir(base or ".")])) + except KeyboardInterrupt: raise except: g_freeze_error_count += 1 log(base + "/\n" + " runtime error, traceback follows ".center(79, '-') + "\n") @@ -669,8 +635,7 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor music.sort() # if there are no subdirs and no music files here, prune this directory - if not(directories) and not(music): - return [] + if not(directories) and not(music): return [] # generate name -> artwork file associations image_assoc = dict([(x[5], x[2]) for x in flist if (x[0] > 0) and (x[4] in (".jpg", ".png"))]) @@ -678,26 +643,21 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor # find artwork files that are not associated to a file or directory unassoc_images = image_assoc.copy() for d0,d1,d2,d3,d4,key in directories: - if key in unassoc_images: - del unassoc_images[key] + if key in unassoc_images: del unassoc_images[key] for d0,d1,d2,d3,d4,key in music: - if key in unassoc_images: - del unassoc_images[key] + if key in unassoc_images: del unassoc_images[key] unassoc_images = list(unassoc_images.values()) unassoc_images.sort() # use one of the unassociated artwork files as this directory's artwork, # unless the inherited artwork file name is already a perfect match (i.e. # the directory name and the artwork name are identical) - if unassoc_images: - if not(artwork) or not(artwork.lower().startswith(base[:-1].lower())): - artwork = find_good_artwork(unassoc_images, base) + if unassoc_images and (not(artwork) or not(artwork.lower().startswith(base[:-1].lower()))): artwork = find_good_artwork(unassoc_images, base) # now that the artwork problem is solved, we start processing: # recurse into subdirectories first res = [] - for isfile, dummy, fullname, s, ext, key in directories: - res.extend(freeze_dir(cache, index, allocator, playlists, fullname + '/', artwork)) + for isfile, dummy, fullname, s, ext, key in directories: res.extend(freeze_dir(cache, index, allocator, playlists, fullname + '/', artwork)) # now process the local files locals = [] @@ -712,7 +672,8 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor log(fullname + ' ', True) valid, info = find_in_cache(cache, index, fullname, s) if valid: - info['changed'] = 0 + assert info + info['changed'] = 0 # what? log("[cached] ", True) else: if info: @@ -723,13 +684,12 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor path = fullname changed = 2 info = mp3info.GetAudioFileInfo(fullname) + assert info iTunesDB.FillMissingTitleAndArtist(info) info['changed'] = changed if not already_there: - if isinstance(info['path'], str): - info['original path'] = info['path'] - else: - info['original path'] = str(info['path'], sys.getfilesystemencoding(), 'replace') + if isinstance(info['path'], str): info['original path'] = info['path'] + else: info['original path'] = str(info['path'], sys.getfilesystemencoding(), 'replace') info['path'] = path # move the track to where it belongs @@ -738,8 +698,7 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor if not(path) or os.path.exists(path) or not(os.path.isdir(os.path.split(path)[0])): # if anything is wrong with the path, generate a new one path = allocator.allocate() + ext - else: - allocator.add(path) + else: allocator.add(path) info['path'] = path info = move_music(fullname, path, info) if not info: continue # something failed @@ -752,27 +711,20 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor # check for unique artist and album check = info.get('artist', None) - if not locals: - unique_artist = check - elif check != unique_artist: - unique_artist = False + if not locals: unique_artist = check + elif check != unique_artist: unique_artist = False check = info.get('album', None) - if not locals: - unique_album = check - elif check != unique_album: - unique_album = False + if not locals: unique_album = check + elif check != unique_album: unique_album = False # finally, append the track to the track list locals.append(info) except KeyboardInterrupt: log("\nInterrupted by user.\nContinue with next file or abort? [c/A] ") - try: - answer = input() - except (IOError, EOFError, KeyboardInterrupt): - answer = "" - if not answer.lower().startswith("c"): - raise + try: answer = input() + except (IOError, EOFError, KeyboardInterrupt): answer = "" + if not answer.lower().startswith("c"): raise except: g_freeze_error_count += 1 @@ -783,18 +735,12 @@ def freeze_dir(cache, index, allocator: Allocator, playlists=[], base="", artwor # if all files in this directory share the same album title, but differ # in the artist name, we assume it's a compilation if unique_album and not(unique_artist): - for info in locals: - info['compilation'] = 1 + for info in locals: info['compilation'] = 1 # combine the lists and return them res.extend(locals) return res - -################################################################################ -## playlist sorting ## -################################################################################ - def cmp_lst(a, b, order, empty_pos): a = max(a.get('last played time', 0), a.get('last skipped time', 0)) b = max(b.get('last played time', 0), b.get('last skipped time', 0)) @@ -805,29 +751,23 @@ def cmp_lst(a, b, order, empty_pos): if not b: return -empty_pos return order * cmp(a, b) -def cmp_path(a, b, order, empty_pos): - return order * trackcmp(a, b) +def cmp_path(a, b, order, empty_pos): return order * trackcmp(a, b) class cmp_key: - def __init__(self, key): - self.key = key - def __repr__(self): - return "%s(%s)" % (self.__class__.__name__, repr(self.key)) + def __init__(self, key): self.key = key + def __repr__(self): return "%s(%s)" % (self.__class__.__name__, repr(self.key)) def __call__(self, a, b, order, empty_pos): if self.key in a: if self.key in b: a = a[self.key] - if type(a) in (types.StringType, types.UnicodeType): a = a.lower() + if isinstance(a, (str, bytes)): a = a.lower() b = b[self.key] - if type(b) in (types.StringType, types.UnicodeType): b = b.lower() + if isinstance(b, (str, bytes)): b = b.lower() return order * cmp(a, b) - else: - return -empty_pos + else: return -empty_pos else: - if self.key in b: - return empty_pos - else: - return 0 + if self.key in b: return empty_pos + else: return 0 sort_criteria = { 'playcount': lambda a,b,o,e: o*cmp(a.get('play count', 0), b.get('play count', 0)), @@ -842,10 +782,8 @@ sort_criteria = { 'filesize': 'size', 'path': cmp_path, } -for nc in ('title', 'artist', 'album', 'compilation', 'rating', 'path', \ -'length', 'size', 'track number', 'year', 'bitrate', 'sample rate', 'volume', \ -'last played time', 'last skipped time', 'mtime', 'disc number', 'total discs', \ -'BPM', 'movie flag'): +for nc in ('title', 'artist', 'album', 'compilation', 'rating', 'path', 'length', 'size', 'track number', 'year', 'bitrate', 'sample rate', 'volume', \ +'last played time', 'last skipped time', 'mtime', 'disc number', 'total discs', 'BPM', 'movie flag'): sort_criteria[nc.replace(' ', '').lower()] = nc @@ -892,7 +830,7 @@ class SortSpec: def sort(self, tracks): self.tracks = tracks index = list(range(len(self.tracks))) - index.sort(self._cmp) + index.sort(key=cmp_to_key(self._cmp)) del self.tracks return [tracks[i] for i in index] @@ -903,19 +841,13 @@ class SortSpec: def __len__(self): return len(self.criteria) - -################################################################################ -## playlist processing ## -################################################################################ - def add_scripted_playlist(db, tracklist, list_name, include, exclude, shuffle=False, changemask=0, sort=None): if not(list_name) or not(include or changemask) or not(tracklist): return tracks = [] log("Processing playlist `%s': " % (list_name), True) for track in tracklist: - if not 'original path' in track: - continue # we don't know the real name of this file, so skip it + if not 'original path' in track: continue # we don't know the real name of this file, so skip it name = track['original path'].encode(sys.getfilesystemencoding(), 'replace').lower() ok = changemask & track.get('changed', 0) for pattern in include: @@ -926,45 +858,34 @@ def add_scripted_playlist(db, tracklist, list_name, include, exclude, shuffle=Fa if fnmatch.fnmatch(name, pattern): ok = False break - if ok: - tracks.append(track) - log("%d tracks\n" % len(tracks)) - if not tracks: - return + if ok: tracks.append(track) + log(f"{len(tracks)} tracks\n") + if not tracks: return if shuffle == 1: shuffle = BalancedShuffle() - for info in tracks: - shuffle.add(info.get('original path', None) or info.get('path', "???"), info) + for info in tracks: shuffle.add(info.get('original path', None) or info.get('path', "???"), info) tracks = shuffle.shuffle() - if shuffle == 2: - random.shuffle(tracks) - if sort: - tracks = sort.sort(tracks) + if shuffle == 2: random.shuffle(tracks) + if sort: tracks = sort.sort(tracks) db.add_playlist(tracks, list_name) def process_m3u(db, tracklist, index, filename, skip_album_playlists): - if not(filename) or not(tracklist): - return + if not(filename) or not(tracklist): return basedir, list_name = os.path.split(filename) list_name = str(os.path.splitext(list_name)[0], sys.getfilesystemencoding(), 'replace') log("Processing playlist `%s': " % (list_name), True) - try: - f = open(filename, "r") - except IOError as e: - log("ERROR: cannot open `%s': %s\n" % (filename, e.strerror)) + try: f = open(filename, "r") + except IOError as e: log("ERROR: cannot open `%s': %s\n" % (filename, e.strerror)) tracks = [] # collect all tracks for line in f: line = line.strip() - if line.startswith('#'): - continue # comment or EXTM3U line + if line.startswith('#'): continue # comment or EXTM3U line line = os.path.normpath(os.path.join(basedir, line)).replace("\\", "/").lower() - try: - tracks.append(tracklist[index[line]]) - except KeyError: - continue # file not found -> sad, but not fatal + try: tracks.append(tracklist[index[line]]) + except KeyError: continue # file not found -> sad, but not fatal f.close() # check if it's an album playlist @@ -978,8 +899,7 @@ def process_m3u(db, tracklist, index, filename, skip_album_playlists): elif info['album'] != ref_album: ok = True # "this playlist is mixed-album, so it's clean" break - else: - ok = False # "all known tracks are from the same album, how sad" + else: ok = False # "all known tracks are from the same album, how sad" if not ok: # now check if this playlist really covers the _whole_ album ok = len(tracks) @@ -998,8 +918,7 @@ def process_m3u(db, tracklist, index, filename, skip_album_playlists): # finish everything log("%d tracks\n" % len(tracks)) - if not tracks: - return + if not tracks: return db.add_playlist(tracks, list_name) @@ -1015,8 +934,8 @@ def make_directory_playlists(db, tracklist): dirs[dir].append(track) else: dirs[dir] = [track] - dirlist = dirs.keys() - dirlist.sort(fncmp) + dirlist = list(dirs.keys()) + dirlist.sort(key=cmp_to_key(fncmp)) for dir in dirlist: log("Processing playlist `%s': " % dir, True) @@ -1026,30 +945,24 @@ def make_directory_playlists(db, tracklist): db.add_playlist(tracks, dir) -shuffle_options = { - "0": 0, "no": 0, "off": 0, "false": 0, "disabled": 0 , "none": 0, - "1": 1, "yes": 1, "on": 1, "true": 0, "enabled": 1, "balanced": 1, - "2": 2, "random": 2, "standard": 2, +shuffle_options = {"0": 0, "no": 0, "off": 0, "false": 0, "disabled": 0, "none": 0, + "1": 1, "yes": 1, "on": 1, "true": 0, "enabled": 1, "balanced": 1, + "2": 2, "random": 2, "standard": 2 } def parse_master_playlist_file(): # helper function def yesno(s): - if s.lower() in ('true', 'enable', 'enabled', 'yes', 'y'): - return 1 - try: - return (int(s) != 0) - except ValueError: - return 0 + if s.lower() in ('true', 'enable', 'enabled', 'yes', 'y'): return 1 + try: return (int(s) != 0) + except ValueError: return 0 # default values skip_album_playlists = True directory_playlists = False lists = [] # now we're parsing - try: - f = open(MASTER_PLAYLIST_FILE, "r") - except IOError: - return (skip_album_playlists, directory_playlists, lists) + try: f = open(MASTER_PLAYLIST_FILE, "r") + except IOError: return (skip_album_playlists, directory_playlists, lists) include = [] exclude = [] list_name = None @@ -1062,8 +975,7 @@ def parse_master_playlist_file(): line = line.split(';', 1)[0].strip() if not line: continue if (line[0] == '[') and (line[-1] == ']'): - if list_name and (include or changemask): - lists.append((list_name, include, exclude, shuffle, changemask, sort)) + if list_name and (include or changemask): lists.append((list_name, include, exclude, shuffle, changemask, sort)) include = [] exclude = [] list_name = line[1:-1] @@ -1071,10 +983,8 @@ def parse_master_playlist_file(): changemask = 0 sort = SortSpec() continue - try: - key, value = [x.strip().replace("\\", "/") for x in line.split('=')] - except ValueError: - continue + try: key, value = [x.strip().replace("\\", "/") for x in line.split('=')] + except ValueError: continue key = key.lower().replace(' ', '_') if not value: log("WARNING: In %s:%d: key `%s' without a value\n" % (MASTER_PLAYLIST_FILE, lineno, key)) @@ -1086,57 +996,37 @@ def parse_master_playlist_file(): if list_name: log("WARNING: In %s:%d: global option `%s' inside a playlist\n" % (MASTER_PLAYLIST_FILE, lineno, key)) directory_playlists = yesno(value) elif key == "shuffle": - try: - shuffle = shuffle_options[value.lower()] - except KeyError: - log("WARNING: In %s:%d: invalid value `%s' for shuffle option\n" % (MASTER_PLAYLIST_FILE, lineno, value)) - elif key == "new": - changemask = (changemask & (~2)) | (yesno(value) << 1) - elif key == "changed": - changemask = (changemask & (~1)) | yesno(value) + try: shuffle = shuffle_options[value.lower()] + except KeyError: log("WARNING: In %s:%d: invalid value `%s' for shuffle option\n" % (MASTER_PLAYLIST_FILE, lineno, value)) + elif key == "new": changemask = (changemask & (~2)) | (yesno(value) << 1) + elif key == "changed": changemask = (changemask & (~1)) | yesno(value) elif key == "sort": - try: - sort = SortSpec(value) + sort - except SSParseError as e: - log("WARNING: In %s:%d: %s\n" % (MASTER_PLAYLIST_FILE, lineno, e)) + try: sort = SortSpec(value) + sort + except SSParseError as e: log("WARNING: In %s:%d: %s\n" % (MASTER_PLAYLIST_FILE, lineno, e)) elif key in ("include", "exclude"): - if value[0] == "/": - value = value[1:] + if value[0] == "/": value = value[1:] if os.path.isdir(value): - if value[-1] != "/": - value += "/" + if value[-1] != "/": value += "/" value += "*" - if key == "include": - include.append(value.lower()) - else: - exclude.append(value.lower()) - else: - log("WARNING: In %s:%d: unknown key `%s'\n" % (MASTER_PLAYLIST_FILE, lineno, key)) + if key == "include": include.append(value.lower()) + else: exclude.append(value.lower()) + else: log("WARNING: In %s:%d: unknown key `%s'\n" % (MASTER_PLAYLIST_FILE, lineno, key)) f.close() - if list_name and (include or changemask): - lists.append((list_name, include, exclude, shuffle, changemask, sort)) + if list_name and (include or changemask): lists.append((list_name, include, exclude, shuffle, changemask, sort)) return (skip_album_playlists, directory_playlists, lists) - -################################################################################ -## artwork ## -################################################################################ - re_cover = re.compile(r'[^a-z]cover[^a-z]') re_front = re.compile(r'[^a-z]front[^a-z]') def find_good_artwork(files, base): - if not files: - return None # sorry, no files here - dirname, basename = os.path.split(base) - if not basename: - dirname, basename = os.path.split(base) + if not files: return None # sorry, no files here + _, basename = os.path.split(base) + if not basename: _, basename = os.path.split(base) basename = basename.strip().lower() candidates = [] for name in files: ref = os.path.splitext(name)[0].strip().lower() # if the file has the same name as the directory, we'll use that directly - if ref == basename: - return name + if ref == basename: return name ref = "|%s|" % ref score = 0 if re_cover.search(ref): @@ -1149,7 +1039,6 @@ def find_good_artwork(files, base): candidates.sort() return candidates[0][2] # return the candidate with the best score - def GenerateArtwork(model, tracklist): # step 0: check PIL availability if not iTunesDB.PILAvailable: @@ -1161,11 +1050,9 @@ def GenerateArtwork(model, tracklist): artwork_list = {} for track in tracklist: artwork = track.get('artwork', None) - if not artwork: - continue # no artwork file + if not artwork: continue # no artwork file dbid = track.get('dbid', None) - if not dbid: - continue # artwork doesn't make sense without a dbid + if not dbid: continue # artwork doesn't make sense without a dbid if artwork in artwork_list: artwork_list[artwork].append(dbid) else: @@ -1196,17 +1083,14 @@ def GenerateArtwork(model, tracklist): # step 5: save the artwork cache try: with open(ARTWORK_CACHE_FILE, "wb") as f: pickle.dump(new_cache, f) - except (IOError, EOFError, pickle.PickleError): - log("ERROR: can't save the artwork cache\n") + except (IOError, EOFError, pickle.PickleError): log("ERROR: can't save the artwork cache\n") # step 6: update the 'mhii link' field for track in tracklist: dbid = track.get('dbid', None) mhii = dbid2mhii.get(dbid, None) - if mhii: - track['mhii link'] = mhii - elif 'mhii link' in track: - del track['mhii link'] + if mhii: track['mhii link'] = mhii + elif 'mhii link' in track: del track['mhii link'] ################################################################################ @@ -1219,22 +1103,15 @@ def Freeze(CacheInfo=None, UpdateOnly=False): state, cache = CacheInfo if UpdateOnly: - if (state != "frozen") and not(Options['force']): - confirm(""" -NOTE: The database is not frozen, the update will not work as expected! -""") + if (state != "frozen") and not(Options['force']): confirm("\nNOTE: The database is not frozen, the update will not work as expected!\n") else: - if (state == "frozen") and not(Options['force']): - confirm(""" -NOTE: The database is already frozen. -""") + if (state == "frozen") and not(Options['force']): confirm("\nNOTE: The database is already frozen.\n") state = "frozen" # allocate the filename allocator if not UpdateOnly: log("Scanning for present files ...\n", True) - try: - allocator = Allocator(MUSIC_DIR[:-1]) + try: allocator = Allocator(MUSIC_DIR[:-1]) except (IOError, OSError): log("FATAL: can't read or write the music directory!\n") return @@ -1266,25 +1143,19 @@ NOTE: The database is already frozen. try: with open(MODEL_FILE, "r") as f: model = f.read().strip()[:10].lower() log("\nLoaded model name `%s' from the cache.\n" % model) - except IOError: - pass + except IOError: pass if model: model = model.strip().lower() - if not(model in iTunesDB.ImageFormats): - log("\nWARNING: model `%s' unrecognized, skipping Artwork generation.\n" % model) + if not(model in iTunesDB.ImageFormats): log("\nWARNING: model `%s' unrecognized, skipping Artwork generation.\n" % model) else: - try: + try: with open(MODEL_FILE, "w") as f: f.write(model) - except IOError: - pass - else: - log("\nNo model specified, skipping Artwork generation.\n") - else: - model = None + except IOError: pass + else: log("\nNo model specified, skipping Artwork generation.\n") + else: model = None # generate track IDs - if not UpdateOnly: - iTunesDB.GenerateIDs(tracklist) + if not UpdateOnly: iTunesDB.GenerateIDs(tracklist) # generate the artwork list if model and not(UpdateOnly): @@ -1299,37 +1170,29 @@ NOTE: The database is already frozen. save_cache((state, tracklist)) # add playlists according to the master playlist file - for listspec in master_playlists: - add_scripted_playlist(db, tracklist, *listspec) + for listspec in master_playlists: add_scripted_playlist(db, tracklist, *listspec) # process all m3u playlists if playlists: log("Updating track index ...\n", True) index = make_cache_index(tracklist) - for plist in playlists: - process_m3u(db, tracklist, index, plist, skip_album_playlists) + for plist in playlists: process_m3u(db, tracklist, index, plist, skip_album_playlists) # create directory playlists - if directory_playlists: - make_directory_playlists(db, tracklist) + if directory_playlists: make_directory_playlists(db, tracklist) # finish iTunesDB and apply hash stuff log("Finalizing iTunesDB ...\n") db = db.finish() fwids = hash58.GetFWIDs() try: - f = open(FWID_FILE, "r") - fwid = f.read().strip().upper() - f.close() - if len(fwid) != 16: - fwid = None - except IOError: - fwid = None + with open(FWID_FILE, "r") as f: fwid = f.read().strip().upper() + if len(fwid) != 16: fwid = None + except IOError: fwid = None store_fwid = False if fwid: # preferred FWID stored on iPod - if fwids and not(fwid in fwids): - log("WARNING: Stored serial number doesn't match any connected iPod!\n") + if fwids and not(fwid in fwids): log("WARNING: Stored serial number doesn't match any connected iPod!\n") else: # auto-detect FWID if fwids: @@ -1344,21 +1207,17 @@ NOTE: The database is already frozen. " it will likely not play anything!\n") if fwid: db = hash58.UpdateHash(db, fwid) - if store_fwid: + if store_fwid and fwid: try: - f = open(FWID_FILE, "w") - f.write(fwid) - f.close() - except IOError: - pass + with open(FWID_FILE, "w") as f: f.write(fwid) + except IOError: pass # write iTunesDB write_ok = True backup(DB_FILE) try: - f = open(DB_FILE, "wb") - f.write(db) - f.close() + with open(DB_FILE, "wb") as f: + f.write(db) except IOError as e: write_ok = False log("FAILED: %s\n" % e.strerror + @@ -1376,75 +1235,47 @@ NOTE: The database is already frozen. # finally, save the tracklist as the cache for the next run save_cache((state, tracklist)) - -################################################################################ -## UNFREEZE action ## -################################################################################ - def Unfreeze(CacheInfo=None): if not CacheInfo: CacheInfo = load_cache((None, None)) state, cache = CacheInfo - try: - cache_len = len(cache) # type: ignore - except: - cache_len = None - - if not(state) or (cache_len is None): + if not state or cache is None: fatal("can't unfreeze: rePear cache is missing or broken") - raise Exception - if state!="frozen" and not(Options['force']): - confirm(""" -NOTE: The database is already unfrozen. -""") + return + + if state != "frozen" and not Options['force']: confirm("\nNOTE: The database is already unfrozen.\n") log("Moving tracks back to their original locations ...\n") - success = 0 - failed = 0 + success = failed = 0 for info in cache: # type: ignore src = printable(info.get('path', "")) dest = printable(info.get('original path', "")) if not src: log("ERROR: track lacks path attribute\n") continue - if not dest: - continue # no original path + if not dest: continue # no original path log("%s " % dest) - if move_file(src, dest): - failed += 1 - else: - success += 1 - log("Operation complete: %d tracks total, %d moved back, %d failed.\n" % \ - (len(cache), success, failed)) + if move_file(src, dest): failed += 1 + else: success += 1 + log(f"Operation complete: {len(cache)} tracks total, {success} moved back, {failed} failed.\n") log("\nYou can now manage the music files on your iPod.\n") save_cache(("unfrozen", cache)) - -################################################################################ -## the configuration actions ## -################################################################################ - def ConfigFWID(): log("Determining serial number (FWID) of attached iPods ...\n") fwids = hash58.GetFWIDs() try: - f = open(FWID_FILE, "r") - fwid = f.read().strip().upper() - f.close() - if len(fwid) != 16: - fwid = None - except IOError: - fwid = None + with open(FWID_FILE, "r") as f: fwid = f.read().strip().upper() + if len(fwid) != 16: fwid = None + except IOError: fwid = None if not fwids: # no FWIDs detected - if fwid: - return log("No iPod detected, but FWID is already set up (%s).\n\n" % fwid) - else: - return log("No iPod detected, can't determine FWID.\n\n") + if fwid: return log(f"No iPod detected, but FWID is already set up ({fwid}).\n\n") + else: return log("No iPod detected, can't determine FWID.\n\n") if len(fwids) > 1: # multiple FWIDs detected if fwid and (fwid in fwids): - return log("Multiple iPods detected, but FWID is already set up (%s).\n\n" % fwid) + return log(f"Multiple iPods detected, but FWID is already set up ({fwid}).\n\n") else: return log("Multiple iPods detected, can't determine FWID.\n" + \ "Please unplug all iPods except the one you're configuring\n\n") @@ -1454,30 +1285,24 @@ def ConfigFWID(): log("Warning: This serial number is different from the one that has been stored on\n" + \ " the iPod (%s). Storing the new FWID anyway.\n" % fwid) fwid = fwids[0] - if not fwid: - return log("\n") + if not fwid: return log("\n") try: - f = open(FWID_FILE, "w") - f.write(fwid) - f.close() + with open(FWID_FILE, "w") as f: f.write(fwid) log("FWID saved.\n\n") - except IOError: - log("Error saving the FWID.\n\n") + except IOError: log("Error saving the FWID.\n\n") - -models = ( - (None, "other/unspecified (no cover artwork)"), - ('photo', '4g', "iPod photo (4G)"), - ('video', '5g', "iPod video (5G)"), - ('classic', '6g', "iPod classic (6G)"), - ('nano', 'nano1g', 'nano2g', "iPod nano (1G/2G)"), - ('nano3g', "iPod nano (3G, \"fat nano\")"), - ('nano4g', "iPod nano (4G)"), -) +models = [ + [None, "other/unspecified (no cover artwork)"], + ['photo', '4g', "iPod photo (4G)"], + ['video', '5g', "iPod video (5G)"], + ['classic', '6g', "iPod classic (6G)"], + ['nano', 'nano1g', 'nano2g', "iPod nano (1G/2G)"], + ['nano3g', "iPod nano (3G, \"fat nano\")"], + ['nano4g', "iPod nano (4G)"], +] def is_model_ok(mod_id): for m in models[1:]: - if mod_id in m[:-1]: - return True + if mod_id in m[:-1]: return True return False def ConfigModel(): @@ -1491,23 +1316,16 @@ def ConfigModel(): if model in models[i][:-1]: default = i c = "*" - else: - c = " " + else: c = " " print(c, "%d." % i, models[i][-1]) - try: - answer = int(input("Which model is this iPod? [0-%d, default %d] => " % (len(models) - 1, default))) - except (IOError, EOFError, KeyboardInterrupt, ValueError): - answer = default - if (answer < 0) or (answer >= len(models)): - answer = default - if answer: + try: answer = int(input("Which model is this iPod? [0-%d, default %d] => " % (len(models) - 1, default))) + except (IOError, EOFError, KeyboardInterrupt, ValueError): answer = default + if (answer < 0) or (answer >= len(models)): answer = default + if answer and models[answer] and models[answer][0] is not None: try: - f = open(MODEL_FILE, "w") - f.write(models[answer][0]) - f.close() + with open(MODEL_FILE, "w") as f: f.write(models[answer][0]) log("Model set to `%s'.\n\n" % models[answer][-1]) - except IOError: - log("Error: cannot set model.\n\n") + except IOError: log("Error: cannot set model.\n\n") else: delete(MODEL_FILE, True) log("Model set to `other'.\n\n") @@ -1534,41 +1352,27 @@ class INIKey: if not s.endswith("\n"): s += "\n" return s + "%s = %s\n" % (self.key, self.value) s = s[:self.start] + self.value + s[self.end:] - if not(self.valid) and (self.comment >= 0): - s = s[:self.comment] + s[self.comment+1:] + if not(self.valid) and (self.comment >= 0): s = s[:self.comment] + s[self.comment+1:] return s def ConfigAll(): ConfigFWID() ConfigModel() - -################################################################################ -## the two minor ("also-ran") actions ## -################################################################################ - def Auto(): state, cache = load_cache((None, [])) - if state == 'frozen': - Unfreeze((state, cache)) - else: - Freeze((state, cache)) + if state == 'frozen': Unfreeze((state, cache)) + else: Freeze((state, cache)) def Reset(): state, cache = load_cache((None, [])) if (state == 'frozen') and not(Options['force']): - confirm(""" -WARNING: The database is currently frozen. If you reset the cache now, you will - lose all file name information. This cannot be undone! -""") + confirm("\nWARNING: The database is currently frozen. If you reset the cache now, you will\n\tlose all file name information. This cannot be undone!\n") return - try: - os.remove(CACHE_FILE) + try: os.remove(CACHE_FILE) except OSError: - try: - save_cache((None, [])) - except IOError: - pass + try: save_cache((None, [])) + except IOError: pass delete(ARTWORK_CACHE_FILE, True) log("\nCache reset.\n") @@ -1576,7 +1380,7 @@ WARNING: The database is currently frozen. If you reset the cache now, you will ## the main function ## ################################################################################ -class MyOptionParser(optparse.OptionParser): +class OptionParser(optparse.OptionParser): def format_help(self, formatter=None): models = list(iTunesDB.ImageFormats.keys()) models.sort() @@ -1600,8 +1404,7 @@ If no action is specified, rePear automatically determines which of the """ if __name__ == "__main__": - parser = MyOptionParser(version=__version__, - usage="%prog [options] []") + parser = OptionParser(version=__version__, usage="%prog [options] []") parser.add_option("-r", "--root", action="store", default=None, metavar="PATH", help="set the iPod's root directory path") parser.add_option("-l", "--log", action="store", default="repear.log", metavar="FILE", help="set the output log file path") parser.add_option("-m", "--model", action="store", default=None, metavar="MODEL", help="specify the iPod model (REQUIRED for artwork support)") @@ -1611,25 +1414,21 @@ if __name__ == "__main__": (opts, args) = parser.parse_args() Options = opts.__dict__ - if len(args)>1: parser.error("too many arguments") - if args: - action = args[0].strip().lower() - else: - action = "auto" + if len(args) > 1: parser.error("too many arguments") + if args: action = args[0].strip().lower() + else: action = "auto" if action == "help": parser.print_help() sys.exit(0) if not action in ( 'auto', 'freeze', 'unfreeze', 'update', 'dissect', 'reset', \ 'config', 'cfg-fwid', 'cfg-model' - ): - parser.error("invalid action `%s'" % action) + ): parser.error("invalid action `%s'" % action) oldcwd = os.getcwd() open_log() - log("%s\n%s\n\n" % (banner, len(banner) * '-')) - if not logfile: - log("WARNING: can't open log file `%s', logging disabled\n\n" % Options['log']) + log(f"{banner}\n{len(banner)*'-'}\n\n") + if not logfile: log(f"WARNING: can't open log file `{Options['log']}', logging disabled\n\n") goto_root_dir() if Options['playlist']: @@ -1640,15 +1439,15 @@ if __name__ == "__main__": log("\n") try: - if action=="auto": Auto() - elif action=="freeze": Freeze() - elif action=="unfreeze": Unfreeze() - elif action=="update": Freeze(UpdateOnly=True) - elif action=="dissect": Dissect() - elif action=="reset": Reset() - elif action=="config": ConfigAll() - elif action=="cfg-fwid": ConfigFWID() - elif action=="cfg-model": ConfigModel() + if action=="auto": Auto() + elif action=="freeze": Freeze() + elif action=="unfreeze": Unfreeze() + elif action=="update": Freeze(UpdateOnly=True) + elif action=="dissect": Dissect() + elif action=="reset": Reset() + elif action=="config": ConfigAll() + elif action=="cfg-fwid": ConfigFWID() + elif action=="cfg-model": ConfigModel() else: log("Unknown action, don't know what to do.\n") code = 0 except SystemExit as e: