links = []
mode = "-d"
+ force = False
assert len(args) > 0, "no args :("
for arg in args:
if arg in modes:
mode = arg
-
+ if arg == '-f': # force
+ force = True
if "youtube" in arg or "youtu.be" in arg:
links.extend(arg.split(" "))
links = check_playlist(links)
assert len(links) > 0, "Should be at least one song in playlist"
- if mode == "-d":
- pass
- elif mode == "-a":
- # Use ThreadPoolExecutor to run downloads concurrently
- with concurrent.futures.ThreadPoolExecutor() as executor:
- # Schedule the download_audio_stream function for each audio stream
- futures = {executor.submit(get_and_download, link): link for link in links}
-
- elif mode == "-v":
- pass
- elif mode == "-av":
- pass
+ # Use ThreadPoolExecutor to run downloads concurrently
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ # Schedule the download_audio_stream function for each audio stream
+ futures = {executor.submit(get_and_download, link, mode, force): link for link in links}
if __name__ == '__main__':
return filename
-def get_and_download(link):
+def download_thumbnail(thumbnail_url, thumbnail_filename):
+ data = requests.get(thumbnail_url).content
+ with open(thumbnail_filename, 'wb') as f:
+ f.write(data)
+
+def get_and_download(link, mode, force=False):
yt = YouTube(link)
- if fix_filename(yt.title) + '.mp4' in glob.glob("*.mp4"):
+ filename = fix_filename(yt.title)
+ if (filename + '.mp4' in glob.glob("*.mp4")) and not force:
print(f"{yt.title} is already downloaded")
return
-
yt.check_availability()
- print(f"Fetching stream for {yt.title}")
- assert len(yt.streams.filter(only_audio=True)) > 0, "No available audio streams"
- audio_stream = yt.streams.filter(only_audio=True).order_by("abr").last()
+ print(f"Fetching stream for {yt.title}")
+ stream = None
+ if mode == "-a":
+ assert len(yt.streams.filter(only_audio=True)) > 0, "No available audio streams"
+ stream = yt.streams.filter(only_audio=True).order_by("abr").last()
+ if mode == "-v":
+ assert len(yt.streams.filter(only_video=True)) > 0, "No available video streams"
+ stream = yt.streams.filter(only_video=True).order_by("resolution").last()
- print(f"Downloading audio stream for {yt.title}")
- audio_stream.download(filename=fix_filename(audio_stream.default_filename), skip_existing=True)
+ assert stream is not None, "mode is not valid"
+ print(f"Downloading stream for {yt.title}")
+ default_filename = "default " + fix_filename(stream.default_filename)
+ stream.download(filename=default_filename, skip_existing=True)
- # create thumbnail file
- data = requests.get(yt.thumbnail_url).content
- thumbnail_filename = f'{fix_filename(audio_stream.title)}.jpg'
- with open(thumbnail_filename, 'wb') as f:
- f.write(data)
+ thumbnail_filename = f'{filename}.jpg'
+ download_thumbnail(yt.thumbnail_url, thumbnail_filename)
command = [
'ffmpeg',
- '-i', fix_filename(audio_stream.default_filename),
+ '-i', default_filename,
'-i', thumbnail_filename,
'-map', '1',
'-map', '0',
'-c', 'copy',
'-disposition:v:0', 'attached_pic',
- '-metadata', f'title={fix_filename(yt.title)}',
+ '-metadata', f'title={filename}',
'-metadata', f'artist={yt.author}',
'-metadata', f'comment={big_num_format(yt.views) + " views"}',
'-metadata', f'date={yt.publish_date}',
- fix_filename(audio_stream.title) + ".mp4",
+ filename + ".mp4",
'-y'
]
subprocess.run(command)
# clean up tmp files
os.remove(thumbnail_filename)
- os.remove(fix_filename(audio_stream.default_filename))
+ os.remove(default_filename)