From 1cdc62fd0cd34abec9c75a71bfcba572975f7f97 Mon Sep 17 00:00:00 2001 From: Tim Rae Date: Sat, 8 Jun 2024 00:10:29 +0200 Subject: [PATCH] Cleanup partitioning of code used in favorites / playlist list --- src/spotify_to_tidal/__main__.py | 15 ++- src/spotify_to_tidal/sync.py | 167 ++++++++++++------------- src/spotify_to_tidal/tidalapi_patch.py | 26 +--- 3 files changed, 93 insertions(+), 115 deletions(-) diff --git a/src/spotify_to_tidal/__main__.py b/src/spotify_to_tidal/__main__.py index 7abc2fb..2aa7121 100644 --- a/src/spotify_to_tidal/__main__.py +++ b/src/spotify_to_tidal/__main__.py @@ -9,7 +9,8 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument('--config', default='config.yml', help='location of the config file') parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config') - parser.add_argument('--sync-favorites', action='store_true', help='enable synchronization of favorites') + parser.add_argument('--sync-favorites', action='store_true', help='synchronize the favorites') + parser.add_argument('--disable-favorites-sync', action='store_true', help='disable synchronization of favorites (only valid when no other args passed)') # todo: use subparser args = parser.parse_args() with open(args.config, 'r') as f: @@ -29,12 +30,14 @@ def main(): elif config.get('sync_playlists', None): # if the config contains a sync_playlists list of mappings then use that _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config) - else: - # otherwise just use the user playlists in the Spotify account - _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config) - - if args.sync_favorites: + elif args.sync_favorites: + # sync just the favorites _sync.sync_favorites_wrapper(spotify_session, tidal_session, config) + else: + # otherwise sync all the user playlists in the Spotify account and favorites if not disabled + _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config) + if not args.disable_favorites_sync: + _sync.sync_favorites_wrapper(spotify_session, tidal_session, config) if __name__ == '__main__': main() diff --git a/src/spotify_to_tidal/sync.py b/src/spotify_to_tidal/sync.py index 64a7aa0..c624fb2 100755 --- a/src/spotify_to_tidal/sync.py +++ b/src/spotify_to_tidal/sync.py @@ -11,9 +11,10 @@ import requests import sys import spotipy import tidalapi -from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist +from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist import time from tqdm.asyncio import tqdm as atqdm +from tqdm import tqdm import traceback import unicodedata import math @@ -166,16 +167,17 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs): return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs) -async def _fetch_all_from_spotify_in_chunks(spotify_session: spotipy.Spotify, fetch_function: Callable) -> List[dict]: +async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]: output = [] - results = fetch_function(0, spotify_session) + results = fetch_function(0) output.extend([item['track'] for item in results['items'] if item['track'] is not None]) # Get all the remaining tracks in parallel if results['next']: offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))] - extra_results = await asyncio.gather( - *[asyncio.to_thread(fetch_function, offset, spotify_session) for offset in offsets] + extra_results = await atqdm.gather( + *[asyncio.to_thread(fetch_function, offset) for offset in offsets], + desc="Fetching additional data chunks" ) for extra_result in extra_results: output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None]) @@ -189,15 +191,15 @@ async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spo return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset) print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") - return await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=lambda offset, session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"])) + return await _fetch_all_from_spotify_in_chunks(lambda offset, session=spotify_session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"])) async def get_tracks_from_spotify_favorites(spotify_session: spotipy.Spotify) -> List[dict]: - def _get_favorite_tracks(offset: int, spotify_session: spotipy.Spotify): + def _get_favorite_tracks(offset: int): return spotify_session.current_user_saved_tracks(offset=offset) print("Loading favorite tracks from Spotify") - tracks = await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=_get_favorite_tracks) + tracks = await _fetch_all_from_spotify_in_chunks(_get_favorite_tracks) tracks.reverse() return tracks @@ -228,9 +230,8 @@ def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack] for track in spotify_tracks: _populate_one_track_from_spotify(track) -def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]: - ''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures ''' - populate_track_match_cache(spotify_tracks, old_tidal_tracks) +def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]: + ''' Extracts only the tracks that have not already been seen in our Tidal caches ''' results = [] for spotify_track in spotify_tracks: if not spotify_track['id']: continue @@ -256,37 +257,8 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify seen_tracks.add(tidal_id) return output -async def sync_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track], config: dict, tidal_session: tidalapi.Session, tidal_playlist: tidalapi.UserPlaylist, sync_favorites: bool = False): - new_spotify_tracks = get_new_spotify_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks) - if not new_spotify_tracks: - print("No new tracks to search in Spotify tracks") - return - - new_tidal_tracks = await search_new_tracks_on_tidal(new_spotify_tracks=new_spotify_tracks, tidal_session=tidal_session, config=config) - - update_tidal_playlist( - old_tidal_tracks=old_tidal_tracks, - new_tidal_tracks=new_tidal_tracks, - tidal_playlist=tidal_playlist, - tidal_session=tidal_session, - sync_favorites=sync_favorites - ) - -def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> List[t_spotify.SpotifyTrack]: - ''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures ''' - populate_track_match_cache(spotify_tracks, old_tidal_tracks) - results = [] - for spotify_track in spotify_tracks: - if not spotify_track['id']: continue - if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']): - results.append(spotify_track) - return results - -async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyTrack], tidal_session: tidalapi.Session, config: dict) -> List[tidalapi.Track | None]: - ''' Searches for the new Spotify tracks on Tidal ''' - task_description = f"Searching Tidal for {len(new_spotify_tracks)} new Spotify tracks" - semaphore = asyncio.Semaphore(value=config.get('max_concurrency', 10)) - +async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict) -> List[t_spotify.SpotifyTrack]: + """ Generic function for searching for each item in a list of Spotify tracks which have not already been seen """ async def _run_rate_limiter(semaphore): ''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit''' _sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket' @@ -299,69 +271,90 @@ async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyT t0 = t [semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket' + # Extract the new tracks that do not already exist in the old tidal tracklist + tracks_to_search = get_new_spotify_tracks(spotify_tracks) + if not tracks_to_search: + return get_tracks_for_new_tidal_playlist(spotify_tracks) + + # Search for each of the tracks on Tidal concurrently + task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name) + semaphore = asyncio.Semaphore(config.get('max_concurrency', 10)) rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore)) - search_results = await atqdm.gather( - *[repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in new_spotify_tracks], - desc=task_description - ) + search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description ) rate_limiter_task.cancel() - + # Add the search results to the cache - for idx, spotify_track in enumerate(new_spotify_tracks): + for idx, spotify_track in enumerate(tracks_to_search): if search_results[idx]: - track_match_cache.insert((spotify_track['id'], search_results[idx].id)) + track_match_cache.insert( (spotify_track['id'], search_results[idx].id) ) else: color = ('\033[91m', '\033[0m') print(color[0] + f"Could not find track {spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + color[1]) - return search_results + return get_tracks_for_new_tidal_playlist(spotify_tracks) -def update_tidal_playlist(old_tidal_tracks: Sequence[tidalapi.Track], new_tidal_tracks: List[tidalapi.Track | None], tidal_playlist: tidalapi.UserPlaylist, tidal_session: tidalapi.Session, sync_favorites: bool = False): - ''' Updates the Tidal playlist or favorites with the new tracks ''' - old_tidal_track_ids = [t.id for t in old_tidal_tracks] - new_tidal_track_ids = [t.id for t in new_tidal_tracks if t] - - if new_tidal_track_ids == old_tidal_track_ids: - print("No changes to write to Tidal") - elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: - # Append new tracks to the existing playlist or favorites if possible - add_multiple_tracks_to_playlist( - playlist=tidal_playlist, - session=tidal_session, - track_ids=new_tidal_track_ids[len(old_tidal_track_ids):], - sync_favorites=sync_favorites - ) - else: - # Erase old playlist or favorites and add new tracks from scratch if any reordering occured - set_tidal_playlist( - playlist=tidal_playlist, - session=tidal_session, - track_ids=new_tidal_track_ids, - sync_favorites=sync_favorites - ) - -def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config): - for spotify_playlist, tidal_playlist in playlists: - asyncio.run(main=sync_playlist(spotify_session=spotify_session, tidal_session=tidal_session, spotify_playlist=spotify_playlist, tidal_playlist=tidal_playlist, config=config) ) - -async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config): +async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict): + """ sync given playlist to tidal """ # Create a new Tidal playlist if required if not tidal_playlist: print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist") - tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description']) + tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description']) - spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session=spotify_session, spotify_playlist=spotify_playlist) + # Extract the new tracks from the playlist that we haven't already seen before + spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist) old_tidal_tracks = tidal_playlist.tracks() - await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_playlist) + populate_track_match_cache(spotify_tracks, old_tidal_tracks) + new_tidal_track_ids = await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config) + + # Update the Tidal playlist if there are changes + old_tidal_track_ids = [t.id for t in old_tidal_tracks] + if new_tidal_track_ids == old_tidal_track_ids: + print("No changes to write to Tidal playlist") + elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: + # Append new tracks to the existing playlist if possible + add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):]) + else: + # Erase old playlist and add new tracks from scratch if any reordering occured + clear_tidal_playlist(tidal_playlist) + add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids) + +async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict): + """ sync user favorites to tidal """ + def _clear_favorites(session: tidalapi.Session): + favorite_tracks = session.user.favorites.tracks() + track_ids = [track.id for track in favorite_tracks] + for track_id in tqdm(track_ids, desc="Erasing existing Tidal favorites"): + session.user.favorites.remove_track(track_id) + def _add_to_favorites(track_ids: Sequence[int], session: tidalapi.Session): + for track_id in tqdm(track_ids, desc="Adding tracks to Tidal favorites"): + session.user.favorites.add_track(track_id) + + spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session) + print("Getting existing favorite tracks from Tidal") + old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE') + populate_track_match_cache(spotify_tracks, old_tidal_tracks) + new_tidal_track_ids = await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config) + + # Update the Tidal playlist if there are changes + old_tidal_track_ids = [t.id for t in old_tidal_tracks] + if new_tidal_track_ids == old_tidal_track_ids: + print("No changes to write to Tidal favorites") + elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: + # Append new tracks to the existing playlist if possible + _add_to_favorites(new_tidal_track_ids[len(old_tidal_track_ids):], tidal_session) + else: + # Erase old playlist and add new tracks from scratch if any reordering occured + _clear_favorites(tidal_session) + _add_to_favorites(new_tidal_track_ids, tidal_session) + +def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict): + for spotify_playlist, tidal_playlist in playlists: + # sync the spotify playlist to tidal + asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) ) def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config)) -async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): - spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session) - old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE') - await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_session.user.favorites, sync_favorites=True) - def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]): if spotify_playlist['name'] in tidal_playlists: # if there's an existing tidal playlist with the name of the current playlist then use that diff --git a/src/spotify_to_tidal/tidalapi_patch.py b/src/spotify_to_tidal/tidalapi_patch.py index df7677e..d4c4f44 100644 --- a/src/spotify_to_tidal/tidalapi_patch.py +++ b/src/spotify_to_tidal/tidalapi_patch.py @@ -14,30 +14,12 @@ def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int=20): indices = range(min(playlist.num_tracks, chunk_size)) _remove_indices_from_playlist(playlist, indices) progress.update(len(indices)) - -def clear_favorites(session: tidalapi.Session): - favorite_tracks = session.user.favorites.tracks() - track_ids = [track.id for track in favorite_tracks] - for track_id in track_ids: - session.user.favorites.remove_track(track_id) - -def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, session: tidalapi.Session, track_ids: List[int], chunk_size: int = 20, sync_favorites: bool = False): + +def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids: List[int], chunk_size: int=20): offset = 0 - with tqdm(desc="Adding new tracks to Tidal", total=len(track_ids)) as progress: + with tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids)) as progress: while offset < len(track_ids): count = min(chunk_size, len(track_ids) - offset) - if sync_favorites: - for track_id in track_ids[offset:offset + chunk_size]: - session.user.favorites.add_track(track_id) - else: - playlist.add(track_ids[offset:offset + chunk_size]) + playlist.add(track_ids[offset:offset+chunk_size]) offset += count progress.update(count) - -def set_tidal_playlist(playlist: tidalapi.Playlist, session: tidalapi.Session, track_ids: List[int], sync_favorites: bool=False): - if sync_favorites: - clear_favorites(session) - else: - clear_tidal_playlist(playlist=playlist) - add_multiple_tracks_to_playlist(playlist=playlist, session=session, track_ids=track_ids, sync_favorites=sync_favorites) -