From 54526a0306d463c7c4b0c060b7b5504a17d8b682 Mon Sep 17 00:00:00 2001 From: Cobal Date: Wed, 5 Jun 2024 12:48:25 +0200 Subject: [PATCH] feat: :sparkles: Sync favorites tracks to Tidal --- .gitignore | 3 + src/spotify_to_tidal/__main__.py | 10 +- src/spotify_to_tidal/auth.py | 2 +- src/spotify_to_tidal/sync.py | 176 +++++++++++++++++-------- src/spotify_to_tidal/tidalapi_patch.py | 28 +++- 5 files changed, 153 insertions(+), 66 deletions(-) diff --git a/.gitignore b/.gitignore index b57628e..65de735 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,6 @@ venv/ .mypy_cache/ .dmypy.json dmypy.json + +# Visual Studio Code +.vscode/ diff --git a/src/spotify_to_tidal/__main__.py b/src/spotify_to_tidal/__main__.py index e5ae08f..7abc2fb 100644 --- a/src/spotify_to_tidal/__main__.py +++ b/src/spotify_to_tidal/__main__.py @@ -9,6 +9,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument('--config', default='config.yml', help='location of the config file') parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config') + parser.add_argument('--sync-favorites', action='store_true', help='enable synchronization of favorites') args = parser.parse_args() with open(args.config, 'r') as f: @@ -24,13 +25,16 @@ def main(): spotify_playlist = spotify_session.playlist(args.uri) tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session) tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) - _sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config) + _sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config) elif config.get('sync_playlists', None): # if the config contains a sync_playlists list of mappings then use that - _sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config) + _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config) else: # otherwise just use the user playlists in the Spotify account - _sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config) + _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config) + + if args.sync_favorites: + _sync.sync_favorites_wrapper(spotify_session, tidal_session, config) if __name__ == '__main__': main() diff --git a/src/spotify_to_tidal/auth.py b/src/spotify_to_tidal/auth.py index 38e22fe..1013039 100644 --- a/src/spotify_to_tidal/auth.py +++ b/src/spotify_to_tidal/auth.py @@ -13,7 +13,7 @@ __all__ = [ def open_spotify_session(config) -> spotipy.Spotify: credentials_manager = spotipy.SpotifyOAuth(username=config['username'], - scope='playlist-read-private', + scope='playlist-read-private, user-library-read', client_id=config['client_id'], client_secret=config['client_secret'], redirect_uri=config['redirect_uri'], diff --git a/src/spotify_to_tidal/sync.py b/src/spotify_to_tidal/sync.py index deb99b1..64a7aa0 100755 --- a/src/spotify_to_tidal/sync.py +++ b/src/spotify_to_tidal/sync.py @@ -5,7 +5,7 @@ from .cache import failure_cache, track_match_cache import datetime from difflib import SequenceMatcher from functools import partial -from typing import List, Sequence, Set, Mapping +from typing import Callable, List, Sequence, Set, Mapping import math import requests import sys @@ -165,23 +165,41 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs): time.sleep(sleep_schedule.get(remaining, 1)) return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs) + +async def _fetch_all_from_spotify_in_chunks(spotify_session: spotipy.Spotify, fetch_function: Callable) -> List[dict]: + output = [] + results = fetch_function(0, spotify_session) + output.extend([item['track'] for item in results['items'] if item['track'] is not None]) + + # Get all the remaining tracks in parallel + if results['next']: + offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))] + extra_results = await asyncio.gather( + *[asyncio.to_thread(fetch_function, offset, spotify_session) for offset in offsets] + ) + for extra_result in extra_results: + output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None]) + + return output + + async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist): def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str): - fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))" - return spotify_session.playlist_tracks(playlist_id, fields, offset=offset) + fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))" + return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset) - output = [] print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") - results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] ) - output.extend([r['track'] for r in results['items'] if r['track'] is not None]) + return await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=lambda offset, session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"])) - # get all the remaining tracks in parallel - if results['next']: - offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ] - extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] ) - for extra_result in extra_results: - output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None]) - return output + +async def get_tracks_from_spotify_favorites(spotify_session: spotipy.Spotify) -> List[dict]: + def _get_favorite_tracks(offset: int, spotify_session: spotipy.Spotify): + return spotify_session.current_user_saved_tracks(offset=offset) + + print("Loading favorite tracks from Spotify") + tracks = await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=_get_favorite_tracks) + tracks.reverse() + return tracks def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]): """ Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """ @@ -238,7 +256,37 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify seen_tracks.add(tidal_id) return output -async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config): +async def sync_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track], config: dict, tidal_session: tidalapi.Session, tidal_playlist: tidalapi.UserPlaylist, sync_favorites: bool = False): + new_spotify_tracks = get_new_spotify_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks) + if not new_spotify_tracks: + print("No new tracks to search in Spotify tracks") + return + + new_tidal_tracks = await search_new_tracks_on_tidal(new_spotify_tracks=new_spotify_tracks, tidal_session=tidal_session, config=config) + + update_tidal_playlist( + old_tidal_tracks=old_tidal_tracks, + new_tidal_tracks=new_tidal_tracks, + tidal_playlist=tidal_playlist, + tidal_session=tidal_session, + sync_favorites=sync_favorites + ) + +def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> List[t_spotify.SpotifyTrack]: + ''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures ''' + populate_track_match_cache(spotify_tracks, old_tidal_tracks) + results = [] + for spotify_track in spotify_tracks: + if not spotify_track['id']: continue + if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']): + results.append(spotify_track) + return results + +async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyTrack], tidal_session: tidalapi.Session, config: dict) -> List[tidalapi.Track | None]: + ''' Searches for the new Spotify tracks on Tidal ''' + task_description = f"Searching Tidal for {len(new_spotify_tracks)} new Spotify tracks" + semaphore = asyncio.Semaphore(value=config.get('max_concurrency', 10)) + async def _run_rate_limiter(semaphore): ''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit''' _sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket' @@ -251,50 +299,68 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap t0 = t [semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket' + rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore)) + search_results = await atqdm.gather( + *[repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in new_spotify_tracks], + desc=task_description + ) + rate_limiter_task.cancel() + + # Add the search results to the cache + for idx, spotify_track in enumerate(new_spotify_tracks): + if search_results[idx]: + track_match_cache.insert((spotify_track['id'], search_results[idx].id)) + else: + color = ('\033[91m', '\033[0m') + print(color[0] + f"Could not find track {spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + color[1]) + + return search_results + +def update_tidal_playlist(old_tidal_tracks: Sequence[tidalapi.Track], new_tidal_tracks: List[tidalapi.Track | None], tidal_playlist: tidalapi.UserPlaylist, tidal_session: tidalapi.Session, sync_favorites: bool = False): + ''' Updates the Tidal playlist or favorites with the new tracks ''' + old_tidal_track_ids = [t.id for t in old_tidal_tracks] + new_tidal_track_ids = [t.id for t in new_tidal_tracks if t] + + if new_tidal_track_ids == old_tidal_track_ids: + print("No changes to write to Tidal") + elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: + # Append new tracks to the existing playlist or favorites if possible + add_multiple_tracks_to_playlist( + playlist=tidal_playlist, + session=tidal_session, + track_ids=new_tidal_track_ids[len(old_tidal_track_ids):], + sync_favorites=sync_favorites + ) + else: + # Erase old playlist or favorites and add new tracks from scratch if any reordering occured + set_tidal_playlist( + playlist=tidal_playlist, + session=tidal_session, + track_ids=new_tidal_track_ids, + sync_favorites=sync_favorites + ) + +def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config): + for spotify_playlist, tidal_playlist in playlists: + asyncio.run(main=sync_playlist(spotify_session=spotify_session, tidal_session=tidal_session, spotify_playlist=spotify_playlist, tidal_playlist=tidal_playlist, config=config) ) + +async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config): # Create a new Tidal playlist if required if not tidal_playlist: print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist") - tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description']) + tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description']) - # Extract the new tracks from the playlist that we haven't already seen before - spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist) + spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session=spotify_session, spotify_playlist=spotify_playlist) old_tidal_tracks = tidal_playlist.tracks() - tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks) - if not tracks_to_search: - print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name'])) - return + await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_playlist) - # Search for each of the tracks on Tidal concurrently - task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name']) - semaphore = asyncio.Semaphore(config.get('max_concurrency', 10)) - rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore)) - search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description ) - rate_limiter_task.cancel() +def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): + asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config)) - # Add the search results to the cache - for idx, spotify_track in enumerate(tracks_to_search): - if search_results[idx]: - track_match_cache.insert( (spotify_track['id'], search_results[idx].id) ) - else: - color = ('\033[91m', '\033[0m') - print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1]) - - # Update the Tidal playlist if there are changes - old_tidal_track_ids = [t.id for t in old_tidal_tracks] - new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks) - if new_tidal_track_ids == old_tidal_track_ids: - print("No changes to write to Tidal playlist") - elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: - # Append new tracks to the existing playlist if possible - add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):]) - else: - # Erase old playlist and add new tracks from scratch if any reordering occured - set_tidal_playlist(tidal_playlist, new_tidal_track_ids) - -def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config): - for spotify_playlist, tidal_playlist in playlists: - # sync the spotify playlist to tidal - asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) ) +async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): + spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session) + old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE') + await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_session.user.favorites, sync_favorites=True) def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]): if spotify_playlist['name'] in tidal_playlists: @@ -319,7 +385,7 @@ async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): results = spotify_session.user_playlists(config['spotify']['username']) exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])]) playlists.extend([p for p in results['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list]) - + # get all the remaining playlists in parallel if results['next']: offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ] @@ -333,14 +399,14 @@ def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: t def get_playlist_ids(config): return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']] output = [] - for spotify_id, tidal_id in get_playlist_ids(config): + for spotify_id, tidal_id in get_playlist_ids(config=config): try: - spotify_playlist = spotify_session.playlist(spotify_id) + spotify_playlist = spotify_session.playlist(playlist_id=spotify_id) except spotipy.SpotifyException as e: print(f"Error getting Spotify playlist {spotify_id}") raise e try: - tidal_playlist = tidal_session.playlist(tidal_id) + tidal_playlist = tidal_session.playlist(playlist_id=tidal_id) except Exception as e: print(f"Error getting Tidal playlist {tidal_id}") raise e diff --git a/src/spotify_to_tidal/tidalapi_patch.py b/src/spotify_to_tidal/tidalapi_patch.py index e997c5f..df7677e 100644 --- a/src/spotify_to_tidal/tidalapi_patch.py +++ b/src/spotify_to_tidal/tidalapi_patch.py @@ -14,16 +14,30 @@ def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int=20): indices = range(min(playlist.num_tracks, chunk_size)) _remove_indices_from_playlist(playlist, indices) progress.update(len(indices)) - -def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids: List[int], chunk_size: int=20): + +def clear_favorites(session: tidalapi.Session): + favorite_tracks = session.user.favorites.tracks() + track_ids = [track.id for track in favorite_tracks] + for track_id in track_ids: + session.user.favorites.remove_track(track_id) + +def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, session: tidalapi.Session, track_ids: List[int], chunk_size: int = 20, sync_favorites: bool = False): offset = 0 - with tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids)) as progress: + with tqdm(desc="Adding new tracks to Tidal", total=len(track_ids)) as progress: while offset < len(track_ids): count = min(chunk_size, len(track_ids) - offset) - playlist.add(track_ids[offset:offset+chunk_size]) + if sync_favorites: + for track_id in track_ids[offset:offset + chunk_size]: + session.user.favorites.add_track(track_id) + else: + playlist.add(track_ids[offset:offset + chunk_size]) offset += count progress.update(count) -def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]): - clear_tidal_playlist(playlist) - add_multiple_tracks_to_playlist(playlist, track_ids) +def set_tidal_playlist(playlist: tidalapi.Playlist, session: tidalapi.Session, track_ids: List[int], sync_favorites: bool=False): + if sync_favorites: + clear_favorites(session) + else: + clear_tidal_playlist(playlist=playlist) + add_multiple_tracks_to_playlist(playlist=playlist, session=session, track_ids=track_ids, sync_favorites=sync_favorites) +