Fetch Tidal playlists/tracks asynchronously in chunks

This commit is contained in:
Tim Rae
2024-06-09 13:57:32 +02:00
parent 193e74b775
commit ced8db44fe
3 changed files with 50 additions and 27 deletions

View File

@@ -23,7 +23,7 @@ def main():
if args.uri:
# if a playlist ID is explicitly provided as a command line argument then use that
spotify_playlist = spotify_session.playlist(args.uri)
tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session)
tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session)
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed

View File

@@ -11,7 +11,7 @@ import requests
import sys
import spotipy
import tidalapi
from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites
from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites, get_all_playlists, get_all_playlist_tracks, get_all_playlist_tracks
import time
from tqdm.asyncio import tqdm as atqdm
from tqdm import tqdm
@@ -134,15 +134,6 @@ async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Sess
# if none of the search modes succeeded then store the track id to the failure cache
failure_cache.cache_match_failure(spotify_track['id'])
def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
# a dictionary of name --> playlist
print("Loading Tidal playlists... This may take some time.")
tidal_playlists = tidal_session.user.playlists()
output = {}
for playlist in tidal_playlists:
output[playlist.name] = playlist
return output
async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
# utility to repeat calling the function up to 5 times if an exception is thrown
try:
@@ -291,7 +282,7 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
# Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
@@ -346,6 +337,10 @@ def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tida
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
def get_tidal_playlists_wrapper(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user))
return {playlist.name: playlist for playlist in tidal_playlists}
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists:
# if there's an existing tidal playlist with the name of the current playlist then use that
@@ -357,7 +352,7 @@ def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists:
def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
results = []
spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config))
tidal_playlists = get_tidal_playlists_dict(tidal_session)
tidal_playlists = get_tidal_playlists_wrapper(tidal_session)
for spotify_playlist in spotify_playlists:
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
return results

View File

@@ -27,25 +27,53 @@ def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids:
offset += count
progress.update(count)
async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC",) -> List[tidalapi.Track]:
""" Get all favorites from Tidal playlist in chunks. The main library doesn't provide the total number of items or expose the raw json, so need this wrapper """
params = {
"limit": None,
"offset": 0,
"order": order,
"orderDirection": order_direction,
}
first_chunk_raw = favorites.requests.map_request(f"{favorites.base_url}/tracks", params)
async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track]:
"""
Helper function to get all items from a Tidal endpoint in parallel
The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead
"""
def _make_request(offset: int=0):
new_params = params
new_params['offset'] = offset
return session.request.map_request(url, params=new_params)
first_chunk_raw = _make_request()
limit = first_chunk_raw['limit']
total = first_chunk_raw['totalNumberOfItems']
tracks = favorites.session.request.map_json(first_chunk_raw, parse=favorites.session.parse_track)
items = session.request.map_json(first_chunk_raw, parse=parser)
if len(tracks) < total:
if len(items) < total:
offsets = [limit * n for n in range(1, math.ceil(total/limit))]
extra_results = await atqdm.gather(
*[asyncio.to_thread(lambda offset: favorites.tracks(offset=offset, order=order, order_direction=order_direction), offset) for offset in offsets],
*[asyncio.to_thread(lambda offset: session.request.map_json(_make_request(offset), parse=parser), offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
tracks.extend(extra_result)
return tracks
items.extend(extra_result)
return items
async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC", chunk_size: int=100) -> List[tidalapi.Track]:
""" Get all favorites from Tidal playlist in chunks """
params = {
"limit": chunk_size,
"order": order,
"orderDirection": order_direction,
}
return await _get_all_chunks(f"{favorites.base_url}/tracks", session=favorites.session, parser=favorites.session.parse_track, params=params)
async def get_all_playlists(user: tidalapi.User, chunk_size: int=10) -> List[tidalapi.Playlist]:
""" Get all user playlists from Tidal in chunks """
print(f"Loading playlists from Tidal user")
params = {
"limit": chunk_size,
}
return await _get_all_chunks(f"users/{user.id}/playlists", session=user.session, parser=user.playlist.parse_factory, params=params)
async def get_all_playlist_tracks(playlist: tidalapi.Playlist, chunk_size: int=20) -> List[tidalapi.Track]:
""" Get all tracks from Tidal playlist in chunks """
params = {
"limit": chunk_size,
}
print(f"Loading tracks from Tidal playlist '{playlist.name}'")
return await _get_all_chunks(f"{playlist._base_url%playlist.id}/tracks", session=playlist.session, parser=playlist.session.parse_track, params=params)