22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 | def meta(query: List[str], downloader: Downloader) -> None:
"""
This function applies metadata to the selected songs
based on the file name.
If song already has metadata, missing metadata is added
### Arguments
- query: list of strings to search for.
- downloader: Already initialized downloader instance.
### Notes
- This function is multi-threaded.
"""
# Create a list of all songs from all paths in query
paths: List[Path] = []
for path in query:
test_path = Path(path)
if not test_path.exists():
logger.error("Path does not exist: %s", path)
continue
if test_path.is_dir():
for out_format in FFMPEG_FORMATS:
paths.extend(test_path.glob(f"*.{out_format}"))
elif test_path.is_file():
if test_path.suffix.split(".")[-1] not in FFMPEG_FORMATS:
logger.error("File is not a supported audio format: %s", path)
continue
paths.append(test_path)
def process_file(file: Path):
# metadata of the file, url is present in the file.
song_meta = get_file_metadata(file, downloader.settings["id3_separator"])
# Check if song has metadata
# and if it has all the required fields
# if it has all of these fields, we can assume that the metadata is correct
if song_meta and not downloader.settings["force_update_metadata"]:
if (
song_meta.get("artist")
and song_meta.get("artists")
and song_meta.get("name")
and song_meta.get("lyrics")
and song_meta.get("album_art")
):
logger.info("Song already has metadata: %s", file.name)
if downloader.settings["generate_lrc"]:
lrc_file = file.with_suffix(".lrc")
if lrc_file.exists():
logger.info("Lrc file already exists for %s", file.name)
return None
song = Song.from_missing_data(
name=song_meta["name"],
artists=song_meta["artists"],
artist=song_meta["artist"],
)
generate_lrc(song, file)
if lrc_file.exists():
logger.info("Saved lrc file for %s", song.display_name)
else:
logger.info("Could not find lrc file for %s", song.display_name)
return None
# Same as above
if (
not song_meta
or None
in [
song_meta.get("name"),
song_meta.get("album_art"),
song_meta.get("artist"),
song_meta.get("artists"),
song_meta.get("track_number"),
]
or downloader.settings["force_update_metadata"]
):
# Song does not have metadata, or it is missing some fields
# or we are forcing update of metadata
# so we search for it
logger.debug("Searching metadata for %s", file.name)
search_results = get_search_results(file.stem)
if not search_results:
logger.error("Could not find metadata for %s", file.name)
return None
song = search_results[0]
else:
# Song has metadata, so we use it to reinitialize the song object
# and fill in the missing metadata
try:
song = reinit_song(Song.from_missing_data(**song_meta))
except QueryError:
logger.error("Could not find metadata for %s", file.name)
return None
# Check if the song has lyric
# if not use downloader to find lyrics
if song_meta is None or song_meta.get("lyrics") is None:
logger.debug("Fetching lyrics for %s", song.display_name)
song.lyrics = downloader.search_lyrics(song)
if song.lyrics:
logger.info("Found lyrics for song: %s", song.display_name)
else:
song.lyrics = song_meta.get("lyrics")
# Apply metadata to the song
embed_metadata(file, song, skip_album_art=downloader.settings["skip_album_art"])
logger.info("Applied metadata to %s", file.name)
if downloader.settings["generate_lrc"]:
lrc_file = file.with_suffix(".lrc")
if lrc_file.exists():
logger.info("Lrc file already exists for %s", file.name)
return None
generate_lrc(song, file)
if lrc_file.exists():
logger.info("Saved lrc file for %s", song.display_name)
else:
logger.info("Could not find lrc file for %s", song.display_name)
return None
async def pool_worker(file_path: Path) -> None:
async with downloader.semaphore:
# The following function calls blocking code, which would block whole event loop.
# Therefore it has to be called in a separate thread via ThreadPoolExecutor. This
# is not a problem, since GIL is released for the I/O operations, so it shouldn't
# hurt performance.
await downloader.loop.run_in_executor(None, process_file, file_path)
tasks = [pool_worker(path) for path in paths]
# call all task asynchronously, and wait until all are finished
downloader.loop.run_until_complete(asyncio.gather(*tasks))
# to re-download the local songs
if downloader.settings["redownload"]:
songs_url: List[str] = []
for file in paths:
meta_data = get_file_metadata(
Path(file), downloader.settings["id3_separator"]
)
if meta_data and meta_data["url"]:
songs_url.append(meta_data["url"])
songs_list = parse_query(
query=songs_url,
threads=downloader.settings["threads"],
use_ytm_data=downloader.settings["ytm_data"],
playlist_numbering=downloader.settings["playlist_numbering"],
album_type=downloader.settings["album_type"],
playlist_retain_track_cover=downloader.settings[
"playlist_retain_track_cover"
],
)
downloader.download_multiple_songs(songs_list)
|