Move music transfer main code into main() function

This is both good practice, and also useful for some soon-to-be-added
test cases for the script.
master
Jordan Atwood 9 months ago
parent 4619c677ab
commit 8c2f67f31f
Signed by: nightfirecat
GPG Key ID: 615A619C2D73A6DF
  1. 248
      src/.bin/music-transfer.py

@ -32,127 +32,131 @@ KNOWN_EXTENSIONS: Set[str] = set([
def munge_path(path: str) -> str: def munge_path(path: str) -> str:
return re.sub(r'[\:*?"|<>]+', '-', path) return re.sub(r'[\:*?"|<>]+', '-', path)
if len(sys.argv) != 3: def main():
print(f'Usage: {os.path.basename(__file__)} PATHS-FILE DESTINATION-DIR', file=sys.stderr) if len(sys.argv) != 3:
sys.exit(1) print(f'Usage: {os.path.basename(__file__)} PATHS-FILE DESTINATION-DIR', file=sys.stderr)
sys.exit(1)
paths_file: str = sys.argv[1]
destination_dir: str = sys.argv[2] paths_file: str = sys.argv[1]
destination_dir: str = sys.argv[2]
if not os.path.exists(paths_file):
print(f'PATHS-FILE \'{paths_file}\' does not exist or is not readable', file=sys.stderr) if not os.path.exists(paths_file):
sys.exit(2) print(f'PATHS-FILE \'{paths_file}\' does not exist or is not readable', file=sys.stderr)
sys.exit(2)
print('[DEBUG] Gathering destination files list')
destination_files: Set[Path] = set() print('[DEBUG] Gathering destination files list')
if os.path.isdir(destination_dir): destination_files: Set[Path] = set()
for dirpath, _, files in os.walk(destination_dir): if os.path.isdir(destination_dir):
for file in files: for dirpath, _, files in os.walk(destination_dir):
path = Path(os.path.join(dirpath, file)).absolute()
if path.suffix == '.mp3':
if path in destination_files:
print(f'Ignoring duplicate input entry: {file}')
else:
destination_files.add(path)
print(f'[DEBUG] Gathered {len(destination_files)} destination files')
print('[DEBUG] Gathering source files list')
source_files: Set[Path] = set()
for line in open(paths_file, 'r'):
trimmed_line = line.strip()
if not trimmed_line:
continue
path = Path(trimmed_line).absolute()
if path.is_file():
if path.suffix in KNOWN_EXTENSIONS:
source_files.add(path)
else:
print(f'[TRACE] Unknown file extension of entry: {trimmed_line}', file=sys.stderr)
elif path.is_dir():
for dirpath, _, files in os.walk(path.absolute()):
# TODO: DRY (see above)
for file in files: for file in files:
file_path = Path(os.path.join(dirpath, file)).absolute() path = Path(os.path.join(dirpath, file)).absolute()
if file_path.suffix in KNOWN_EXTENSIONS: if path.suffix == '.mp3':
source_files.add(file_path) if path in destination_files:
else: print(f'Ignoring duplicate input entry: {file}')
print(f'[TRACE] Unknown file extension of entry: {file}', file=sys.stderr) else:
elif not path.exists(): destination_files.add(path)
print(f'[ERROR] Could not find source path: {path}') print(f'[DEBUG] Gathered {len(destination_files)} destination files')
sys.exit(3)
print('[DEBUG] Gathering source files list')
if len(source_files) == 0: source_files: Set[Path] = set()
print('No source files to transfer') for line in open(paths_file, 'r'):
sys.exit() trimmed_line = line.strip()
else: if not trimmed_line:
print(f'[DEBUG] Gathered {len(source_files)} source files') continue
print('[DEBUG] Finding longest common prefix of source files') path = Path(trimmed_line).absolute()
longest_prefix = next(iter(source_files)).parent if path.is_file():
for file in source_files: if path.suffix in KNOWN_EXTENSIONS:
if longest_prefix.as_posix() == '/': source_files.add(path)
break else:
print(f'[TRACE] Unknown file extension of entry: {trimmed_line}', file=sys.stderr)
while not file.as_posix().startswith(longest_prefix.as_posix()): elif path.is_dir():
longest_prefix = longest_prefix.parent for dirpath, _, files in os.walk(path.absolute()):
# TODO: DRY (see above)
print('[DEBUG] Filtering files already present in destination') for file in files:
# NOTE: this assumes all filenames are unique, which should already be the case in my music library file_path = Path(os.path.join(dirpath, file)).absolute()
# verify via `find . -type f -not -name '*.jpg' -not -name '*.png' -not -name '*.txt' -exec basename {} \; | sort | uniq -d` if file_path.suffix in KNOWN_EXTENSIONS:
# TODO: add a hash check so that we can overwrite songs on the target if source source_files.add(file_path)
# has a version which is different (new metadata, or better version) else:
for source_path in list(source_files)[:]: print(f'[TRACE] Unknown file extension of entry: {file}', file=sys.stderr)
found_destination_path: Union[Path, None] = next((destination_file for destination_file in destination_files if destination_file.stem == munge_path(source_path.stem)), None) elif not path.exists():
if found_destination_path: print(f'[ERROR] Could not find source path: {path}')
source_files.remove(source_path) sys.exit(3)
destination_files.remove(found_destination_path)
if len(source_files) == 0:
if len(source_files) == 0: print('No source files to transfer')
print('All source files already exist in target') sys.exit()
sys.exit()
# TODO: improve prompt (print 10+ file paths or exit to `PAGER` to view before prompt so user has more info before confirming)
if len(destination_files) > 0:
delete_selection_made = False
while not delete_selection_made:
print(f'Files in destination not found in source files list:\n{destination_files}')
delete_input = input(f'Delete {len(destination_files)} files not found in source files list? [y/n] ')
if delete_input[0] == 'y' or delete_input[0] == 'Y':
delete_selection_made = True
for destination_file in destination_files:
destination_file.unlink()
elif delete_input[0] == 'n' or delete_input[0] == 'N':
delete_selection_made = True
# TODO: list both number of files in songs list & number of files to be transferred
print(f'Copying {len(source_files)} songs to {destination_dir}')
print(f'[TRACE] Copying the following files to destination:\n{source_files}')
for source_file in source_files:
source_copy_path = source_file.with_suffix('').as_posix().replace(longest_prefix.as_posix(), '', 1)
destination_filename = munge_path(source_copy_path)
destination_file_path = f'{destination_dir}/{destination_filename}.mp3'
# TODO: double-check on this
print(f'destination_file_path: {destination_file_path}')
Path(destination_file_path).parent.mkdir(parents = True, exist_ok = True)
if source_file.stem == '.mp3':
shutil.copy2(source_file.as_posix(), destination_file_path)
else: else:
subprocess.call([ print(f'[DEBUG] Gathered {len(source_files)} source files')
'ffmpeg',
'-loglevel', 'quiet', print('[DEBUG] Finding longest common prefix of source files')
'-i', source_file.as_posix(), longest_prefix = next(iter(source_files)).parent
'-codec:a', 'libmp3lame', for file in source_files:
# convert to mp3 with quality level 3 vbr (average 175 kbit/s, ranges from 150-195 kbit/s) if longest_prefix.as_posix() == '/':
# see: https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding break
'-q:a', '3',
destination_file_path while not file.as_posix().startswith(longest_prefix.as_posix()):
]) longest_prefix = longest_prefix.parent
print('[DEBUG] Filtering files already present in destination')
# TODO: improve this output (number of deleted files, number of new files, correct plural of 'files') # NOTE: this assumes all filenames are unique, which should already be the case in my music library
print(f'Finished copying {len(source_files)} new files to {destination_dir}') # verify via `find . -type f -not -name '*.jpg' -not -name '*.png' -not -name '*.txt' -exec basename {} \; | sort | uniq -d`
# TODO: add a hash check so that we can overwrite songs on the target if source
# has a version which is different (new metadata, or better version)
for source_path in list(source_files)[:]:
found_destination_path: Union[Path, None] = next((destination_file for destination_file in destination_files if destination_file.stem == munge_path(source_path.stem)), None)
if found_destination_path:
source_files.remove(source_path)
destination_files.remove(found_destination_path)
if len(source_files) == 0:
print('All source files already exist in target')
sys.exit()
# TODO: improve prompt (print 10+ file paths or exit to `PAGER` to view before prompt so user has more info before confirming)
if len(destination_files) > 0:
delete_selection_made = False
while not delete_selection_made:
print(f'Files in destination not found in source files list:\n{destination_files}')
delete_input = input(f'Delete {len(destination_files)} files not found in source files list? [y/n] ')
if delete_input[0] == 'y' or delete_input[0] == 'Y':
delete_selection_made = True
for destination_file in destination_files:
destination_file.unlink()
elif delete_input[0] == 'n' or delete_input[0] == 'N':
delete_selection_made = True
# TODO: list both number of files in songs list & number of files to be transferred
print(f'Copying {len(source_files)} songs to {destination_dir}')
print(f'[TRACE] Copying the following files to destination:\n{source_files}')
for source_file in source_files:
source_copy_path = source_file.with_suffix('').as_posix().replace(longest_prefix.as_posix(), '', 1)
destination_filename = munge_path(source_copy_path)
destination_file_path = f'{destination_dir}/{destination_filename}.mp3'
# TODO: double-check on this
print(f'destination_file_path: {destination_file_path}')
Path(destination_file_path).parent.mkdir(parents = True, exist_ok = True)
if source_file.stem == '.mp3':
shutil.copy2(source_file.as_posix(), destination_file_path)
else:
subprocess.call([
'ffmpeg',
'-loglevel', 'quiet',
'-i', source_file.as_posix(),
'-codec:a', 'libmp3lame',
# convert to mp3 with quality level 3 vbr (average 175 kbit/s, ranges from 150-195 kbit/s)
# see: https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding
'-q:a', '3',
destination_file_path
])
# TODO: improve this output (number of deleted files, number of new files, correct plural of 'files')
print(f'Finished copying {len(source_files)} new files to {destination_dir}')
if __name__ == '__main__':
main()

Loading…
Cancel
Save