Cleanup partitioning of code used in favorites / playlist list

This commit is contained in:
Tim Rae
2024-06-08 00:10:29 +02:00
parent 54526a0306
commit 1cdc62fd0c
3 changed files with 93 additions and 115 deletions

View File

@@ -9,7 +9,8 @@ def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.yml', help='location of the config file') parser.add_argument('--config', default='config.yml', help='location of the config file')
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config') parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
parser.add_argument('--sync-favorites', action='store_true', help='enable synchronization of favorites') parser.add_argument('--sync-favorites', action='store_true', help='synchronize the favorites')
parser.add_argument('--disable-favorites-sync', action='store_true', help='disable synchronization of favorites (only valid when no other args passed)') # todo: use subparser
args = parser.parse_args() args = parser.parse_args()
with open(args.config, 'r') as f: with open(args.config, 'r') as f:
@@ -29,12 +30,14 @@ def main():
elif config.get('sync_playlists', None): elif config.get('sync_playlists', None):
# if the config contains a sync_playlists list of mappings then use that # if the config contains a sync_playlists list of mappings then use that
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config) _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
else: elif args.sync_favorites:
# otherwise just use the user playlists in the Spotify account # sync just the favorites
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
if args.sync_favorites:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config) _sync.sync_favorites_wrapper(spotify_session, tidal_session, config)
else:
# otherwise sync all the user playlists in the Spotify account and favorites if not disabled
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
if not args.disable_favorites_sync:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -11,9 +11,10 @@ import requests
import sys import sys
import spotipy import spotipy
import tidalapi import tidalapi
from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist
import time import time
from tqdm.asyncio import tqdm as atqdm from tqdm.asyncio import tqdm as atqdm
from tqdm import tqdm
import traceback import traceback
import unicodedata import unicodedata
import math import math
@@ -166,16 +167,17 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs) return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
async def _fetch_all_from_spotify_in_chunks(spotify_session: spotipy.Spotify, fetch_function: Callable) -> List[dict]: async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
output = [] output = []
results = fetch_function(0, spotify_session) results = fetch_function(0)
output.extend([item['track'] for item in results['items'] if item['track'] is not None]) output.extend([item['track'] for item in results['items'] if item['track'] is not None])
# Get all the remaining tracks in parallel # Get all the remaining tracks in parallel
if results['next']: if results['next']:
offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))] offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
extra_results = await asyncio.gather( extra_results = await atqdm.gather(
*[asyncio.to_thread(fetch_function, offset, spotify_session) for offset in offsets] *[asyncio.to_thread(fetch_function, offset) for offset in offsets],
desc="Fetching additional data chunks"
) )
for extra_result in extra_results: for extra_result in extra_results:
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None]) output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])
@@ -189,15 +191,15 @@ async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spo
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset) return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
return await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=lambda offset, session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"])) return await _fetch_all_from_spotify_in_chunks(lambda offset, session=spotify_session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"]))
async def get_tracks_from_spotify_favorites(spotify_session: spotipy.Spotify) -> List[dict]: async def get_tracks_from_spotify_favorites(spotify_session: spotipy.Spotify) -> List[dict]:
def _get_favorite_tracks(offset: int, spotify_session: spotipy.Spotify): def _get_favorite_tracks(offset: int):
return spotify_session.current_user_saved_tracks(offset=offset) return spotify_session.current_user_saved_tracks(offset=offset)
print("Loading favorite tracks from Spotify") print("Loading favorite tracks from Spotify")
tracks = await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=_get_favorite_tracks) tracks = await _fetch_all_from_spotify_in_chunks(_get_favorite_tracks)
tracks.reverse() tracks.reverse()
return tracks return tracks
@@ -228,9 +230,8 @@ def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack]
for track in spotify_tracks: for track in spotify_tracks:
_populate_one_track_from_spotify(track) _populate_one_track_from_spotify(track)
def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]: def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures ''' ''' Extracts only the tracks that have not already been seen in our Tidal caches '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
results = [] results = []
for spotify_track in spotify_tracks: for spotify_track in spotify_tracks:
if not spotify_track['id']: continue if not spotify_track['id']: continue
@@ -256,37 +257,8 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
seen_tracks.add(tidal_id) seen_tracks.add(tidal_id)
return output return output
async def sync_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track], config: dict, tidal_session: tidalapi.Session, tidal_playlist: tidalapi.UserPlaylist, sync_favorites: bool = False): async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict) -> List[t_spotify.SpotifyTrack]:
new_spotify_tracks = get_new_spotify_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks) """ Generic function for searching for each item in a list of Spotify tracks which have not already been seen """
if not new_spotify_tracks:
print("No new tracks to search in Spotify tracks")
return
new_tidal_tracks = await search_new_tracks_on_tidal(new_spotify_tracks=new_spotify_tracks, tidal_session=tidal_session, config=config)
update_tidal_playlist(
old_tidal_tracks=old_tidal_tracks,
new_tidal_tracks=new_tidal_tracks,
tidal_playlist=tidal_playlist,
tidal_session=tidal_session,
sync_favorites=sync_favorites
)
def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> List[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
results = []
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']):
results.append(spotify_track)
return results
async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyTrack], tidal_session: tidalapi.Session, config: dict) -> List[tidalapi.Track | None]:
''' Searches for the new Spotify tracks on Tidal '''
task_description = f"Searching Tidal for {len(new_spotify_tracks)} new Spotify tracks"
semaphore = asyncio.Semaphore(value=config.get('max_concurrency', 10))
async def _run_rate_limiter(semaphore): async def _run_rate_limiter(semaphore):
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit''' ''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket' _sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
@@ -299,69 +271,90 @@ async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyT
t0 = t t0 = t
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket' [semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'
# Extract the new tracks that do not already exist in the old tidal tracklist
tracks_to_search = get_new_spotify_tracks(spotify_tracks)
if not tracks_to_search:
return get_tracks_for_new_tidal_playlist(spotify_tracks)
# Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name)
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore)) rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
*[repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in new_spotify_tracks],
desc=task_description
)
rate_limiter_task.cancel() rate_limiter_task.cancel()
# Add the search results to the cache # Add the search results to the cache
for idx, spotify_track in enumerate(new_spotify_tracks): for idx, spotify_track in enumerate(tracks_to_search):
if search_results[idx]: if search_results[idx]:
track_match_cache.insert((spotify_track['id'], search_results[idx].id)) track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
else: else:
color = ('\033[91m', '\033[0m') color = ('\033[91m', '\033[0m')
print(color[0] + f"Could not find track {spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + color[1]) print(color[0] + f"Could not find track {spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + color[1])
return search_results return get_tracks_for_new_tidal_playlist(spotify_tracks)
def update_tidal_playlist(old_tidal_tracks: Sequence[tidalapi.Track], new_tidal_tracks: List[tidalapi.Track | None], tidal_playlist: tidalapi.UserPlaylist, tidal_session: tidalapi.Session, sync_favorites: bool = False): async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict):
''' Updates the Tidal playlist or favorites with the new tracks ''' """ sync given playlist to tidal """
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
new_tidal_track_ids = [t.id for t in new_tidal_tracks if t]
if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
# Append new tracks to the existing playlist or favorites if possible
add_multiple_tracks_to_playlist(
playlist=tidal_playlist,
session=tidal_session,
track_ids=new_tidal_track_ids[len(old_tidal_track_ids):],
sync_favorites=sync_favorites
)
else:
# Erase old playlist or favorites and add new tracks from scratch if any reordering occured
set_tidal_playlist(
playlist=tidal_playlist,
session=tidal_session,
track_ids=new_tidal_track_ids,
sync_favorites=sync_favorites
)
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
for spotify_playlist, tidal_playlist in playlists:
asyncio.run(main=sync_playlist(spotify_session=spotify_session, tidal_session=tidal_session, spotify_playlist=spotify_playlist, tidal_playlist=tidal_playlist, config=config) )
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
# Create a new Tidal playlist if required # Create a new Tidal playlist if required
if not tidal_playlist: if not tidal_playlist:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist") print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description']) tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session=spotify_session, spotify_playlist=spotify_playlist) # Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks() old_tidal_tracks = tidal_playlist.tracks()
await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_playlist) populate_track_match_cache(spotify_tracks, old_tidal_tracks)
new_tidal_track_ids = await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
# Update the Tidal playlist if there are changes
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal playlist")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
# Append new tracks to the existing playlist if possible
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
else:
# Erase old playlist and add new tracks from scratch if any reordering occured
clear_tidal_playlist(tidal_playlist)
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids)
async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict):
""" sync user favorites to tidal """
def _clear_favorites(session: tidalapi.Session):
favorite_tracks = session.user.favorites.tracks()
track_ids = [track.id for track in favorite_tracks]
for track_id in tqdm(track_ids, desc="Erasing existing Tidal favorites"):
session.user.favorites.remove_track(track_id)
def _add_to_favorites(track_ids: Sequence[int], session: tidalapi.Session):
for track_id in tqdm(track_ids, desc="Adding tracks to Tidal favorites"):
session.user.favorites.add_track(track_id)
spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session)
print("Getting existing favorite tracks from Tidal")
old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE')
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
new_tidal_track_ids = await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config)
# Update the Tidal playlist if there are changes
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal favorites")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
# Append new tracks to the existing playlist if possible
_add_to_favorites(new_tidal_track_ids[len(old_tidal_track_ids):], tidal_session)
else:
# Erase old playlist and add new tracks from scratch if any reordering occured
_clear_favorites(tidal_session)
_add_to_favorites(new_tidal_track_ids, tidal_session)
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict):
for spotify_playlist, tidal_playlist in playlists:
# sync the spotify playlist to tidal
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config)) asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session)
old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE')
await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_session.user.favorites, sync_favorites=True)
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]): def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists: if spotify_playlist['name'] in tidal_playlists:
# if there's an existing tidal playlist with the name of the current playlist then use that # if there's an existing tidal playlist with the name of the current playlist then use that

View File

@@ -14,30 +14,12 @@ def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int=20):
indices = range(min(playlist.num_tracks, chunk_size)) indices = range(min(playlist.num_tracks, chunk_size))
_remove_indices_from_playlist(playlist, indices) _remove_indices_from_playlist(playlist, indices)
progress.update(len(indices)) progress.update(len(indices))
def clear_favorites(session: tidalapi.Session): def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids: List[int], chunk_size: int=20):
favorite_tracks = session.user.favorites.tracks()
track_ids = [track.id for track in favorite_tracks]
for track_id in track_ids:
session.user.favorites.remove_track(track_id)
def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, session: tidalapi.Session, track_ids: List[int], chunk_size: int = 20, sync_favorites: bool = False):
offset = 0 offset = 0
with tqdm(desc="Adding new tracks to Tidal", total=len(track_ids)) as progress: with tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids)) as progress:
while offset < len(track_ids): while offset < len(track_ids):
count = min(chunk_size, len(track_ids) - offset) count = min(chunk_size, len(track_ids) - offset)
if sync_favorites: playlist.add(track_ids[offset:offset+chunk_size])
for track_id in track_ids[offset:offset + chunk_size]:
session.user.favorites.add_track(track_id)
else:
playlist.add(track_ids[offset:offset + chunk_size])
offset += count offset += count
progress.update(count) progress.update(count)
def set_tidal_playlist(playlist: tidalapi.Playlist, session: tidalapi.Session, track_ids: List[int], sync_favorites: bool=False):
if sync_favorites:
clear_favorites(session)
else:
clear_tidal_playlist(playlist=playlist)
add_multiple_tracks_to_playlist(playlist=playlist, session=session, track_ids=track_ids, sync_favorites=sync_favorites)