|
|
|
|
@@ -178,12 +178,13 @@ async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[di
|
|
|
|
|
|
|
|
|
|
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
|
|
|
|
|
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
|
|
|
|
|
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
|
|
|
|
|
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type"
|
|
|
|
|
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
|
|
|
|
|
|
|
|
|
|
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
|
|
|
|
|
return await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
|
|
|
|
|
|
|
|
|
|
items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
|
|
|
|
|
track_filter = lambda item: item.get('type', 'track') == 'track' # type may be 'episode' also
|
|
|
|
|
return list(filter(track_filter, items))
|
|
|
|
|
|
|
|
|
|
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
|
|
|
|
|
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
|
|
|
|
|
@@ -282,14 +283,18 @@ async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tr
|
|
|
|
|
|
|
|
|
|
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict):
|
|
|
|
|
""" sync given playlist to tidal """
|
|
|
|
|
# Create a new Tidal playlist if required
|
|
|
|
|
if not tidal_playlist:
|
|
|
|
|
# Get the tracks from both Spotify and Tidal, creating a new Tidal playlist if necessary
|
|
|
|
|
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
|
|
|
|
|
if len(spotify_tracks) == 0:
|
|
|
|
|
return # nothing to do
|
|
|
|
|
if tidal_playlist:
|
|
|
|
|
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
|
|
|
|
|
else:
|
|
|
|
|
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
|
|
|
|
|
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
|
|
|
|
|
old_tidal_tracks = []
|
|
|
|
|
|
|
|
|
|
# Extract the new tracks from the playlist that we haven't already seen before
|
|
|
|
|
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
|
|
|
|
|
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
|
|
|
|
|
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
|
|
|
|
|
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
|
|
|
|
|
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
|
|
|
|
|
@@ -365,20 +370,25 @@ def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session:
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
|
|
|
|
|
# get all the user playlists from the Spotify account
|
|
|
|
|
# get all the playlists from the Spotify account
|
|
|
|
|
playlists = []
|
|
|
|
|
print("Loading Spotify playlists")
|
|
|
|
|
results = spotify_session.current_user_playlists()
|
|
|
|
|
first_results = spotify_session.current_user_playlists()
|
|
|
|
|
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
|
|
|
|
|
playlists.extend([p for p in results['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
|
|
|
|
|
playlists.extend([p for p in first_results['items']])
|
|
|
|
|
user_id = spotify_session.current_user()['id']
|
|
|
|
|
|
|
|
|
|
# get all the remaining playlists in parallel
|
|
|
|
|
if results['next']:
|
|
|
|
|
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
|
|
|
|
|
if first_results['next']:
|
|
|
|
|
offsets = [ first_results['limit'] * n for n in range(1, math.ceil(first_results['total']/first_results['limit'])) ]
|
|
|
|
|
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) for offset in offsets ] )
|
|
|
|
|
for extra_result in extra_results:
|
|
|
|
|
playlists.extend([p for p in extra_result['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
|
|
|
|
|
return playlists
|
|
|
|
|
playlists.extend([p for p in extra_result['items']])
|
|
|
|
|
|
|
|
|
|
# filter out playlists that don't belong to us or are on the exclude list
|
|
|
|
|
my_playlist_filter = lambda p: p['owner']['id'] == user_id
|
|
|
|
|
exclude_filter = lambda p: not p['id'] in exclude_list
|
|
|
|
|
return list(filter( exclude_filter, filter( my_playlist_filter, playlists )))
|
|
|
|
|
|
|
|
|
|
def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
|
|
|
|
|
# get the list of playlist sync mappings from the configuration file
|
|
|
|
|
|