|
|
|
|
@@ -5,7 +5,7 @@ from .cache import failure_cache, track_match_cache
|
|
|
|
|
import datetime
|
|
|
|
|
from difflib import SequenceMatcher
|
|
|
|
|
from functools import partial
|
|
|
|
|
from typing import List, Sequence, Set, Mapping
|
|
|
|
|
from typing import Callable, List, Sequence, Set, Mapping
|
|
|
|
|
import math
|
|
|
|
|
import requests
|
|
|
|
|
import sys
|
|
|
|
|
@@ -165,23 +165,41 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
|
|
|
|
|
time.sleep(sleep_schedule.get(remaining, 1))
|
|
|
|
|
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_all_from_spotify_in_chunks(spotify_session: spotipy.Spotify, fetch_function: Callable) -> List[dict]:
|
|
|
|
|
output = []
|
|
|
|
|
results = fetch_function(0, spotify_session)
|
|
|
|
|
output.extend([item['track'] for item in results['items'] if item['track'] is not None])
|
|
|
|
|
|
|
|
|
|
# Get all the remaining tracks in parallel
|
|
|
|
|
if results['next']:
|
|
|
|
|
offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
|
|
|
|
|
extra_results = await asyncio.gather(
|
|
|
|
|
*[asyncio.to_thread(fetch_function, offset, spotify_session) for offset in offsets]
|
|
|
|
|
)
|
|
|
|
|
for extra_result in extra_results:
|
|
|
|
|
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])
|
|
|
|
|
|
|
|
|
|
return output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
|
|
|
|
|
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
|
|
|
|
|
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
|
|
|
|
|
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
|
|
|
|
|
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
|
|
|
|
|
|
|
|
|
|
output = []
|
|
|
|
|
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
|
|
|
|
|
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] )
|
|
|
|
|
output.extend([r['track'] for r in results['items'] if r['track'] is not None])
|
|
|
|
|
return await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=lambda offset, session: _get_tracks_from_spotify_playlist(offset=offset, spotify_session=session, playlist_id=spotify_playlist["id"]))
|
|
|
|
|
|
|
|
|
|
# get all the remaining tracks in parallel
|
|
|
|
|
if results['next']:
|
|
|
|
|
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
|
|
|
|
|
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] )
|
|
|
|
|
for extra_result in extra_results:
|
|
|
|
|
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None])
|
|
|
|
|
return output
|
|
|
|
|
|
|
|
|
|
async def get_tracks_from_spotify_favorites(spotify_session: spotipy.Spotify) -> List[dict]:
|
|
|
|
|
def _get_favorite_tracks(offset: int, spotify_session: spotipy.Spotify):
|
|
|
|
|
return spotify_session.current_user_saved_tracks(offset=offset)
|
|
|
|
|
|
|
|
|
|
print("Loading favorite tracks from Spotify")
|
|
|
|
|
tracks = await _fetch_all_from_spotify_in_chunks(spotify_session=spotify_session, fetch_function=_get_favorite_tracks)
|
|
|
|
|
tracks.reverse()
|
|
|
|
|
return tracks
|
|
|
|
|
|
|
|
|
|
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
|
|
|
|
|
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
|
|
|
|
|
@@ -238,7 +256,37 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
|
|
|
|
|
seen_tracks.add(tidal_id)
|
|
|
|
|
return output
|
|
|
|
|
|
|
|
|
|
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
|
|
|
|
|
async def sync_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track], config: dict, tidal_session: tidalapi.Session, tidal_playlist: tidalapi.UserPlaylist, sync_favorites: bool = False):
|
|
|
|
|
new_spotify_tracks = get_new_spotify_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks)
|
|
|
|
|
if not new_spotify_tracks:
|
|
|
|
|
print("No new tracks to search in Spotify tracks")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
new_tidal_tracks = await search_new_tracks_on_tidal(new_spotify_tracks=new_spotify_tracks, tidal_session=tidal_session, config=config)
|
|
|
|
|
|
|
|
|
|
update_tidal_playlist(
|
|
|
|
|
old_tidal_tracks=old_tidal_tracks,
|
|
|
|
|
new_tidal_tracks=new_tidal_tracks,
|
|
|
|
|
tidal_playlist=tidal_playlist,
|
|
|
|
|
tidal_session=tidal_session,
|
|
|
|
|
sync_favorites=sync_favorites
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> List[t_spotify.SpotifyTrack]:
|
|
|
|
|
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
|
|
|
|
|
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
|
|
|
|
|
results = []
|
|
|
|
|
for spotify_track in spotify_tracks:
|
|
|
|
|
if not spotify_track['id']: continue
|
|
|
|
|
if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']):
|
|
|
|
|
results.append(spotify_track)
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
async def search_new_tracks_on_tidal(new_spotify_tracks: List[t_spotify.SpotifyTrack], tidal_session: tidalapi.Session, config: dict) -> List[tidalapi.Track | None]:
|
|
|
|
|
''' Searches for the new Spotify tracks on Tidal '''
|
|
|
|
|
task_description = f"Searching Tidal for {len(new_spotify_tracks)} new Spotify tracks"
|
|
|
|
|
semaphore = asyncio.Semaphore(value=config.get('max_concurrency', 10))
|
|
|
|
|
|
|
|
|
|
async def _run_rate_limiter(semaphore):
|
|
|
|
|
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
|
|
|
|
|
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
|
|
|
|
|
@@ -251,50 +299,68 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
|
|
|
|
|
t0 = t
|
|
|
|
|
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'
|
|
|
|
|
|
|
|
|
|
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
|
|
|
|
|
search_results = await atqdm.gather(
|
|
|
|
|
*[repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in new_spotify_tracks],
|
|
|
|
|
desc=task_description
|
|
|
|
|
)
|
|
|
|
|
rate_limiter_task.cancel()
|
|
|
|
|
|
|
|
|
|
# Add the search results to the cache
|
|
|
|
|
for idx, spotify_track in enumerate(new_spotify_tracks):
|
|
|
|
|
if search_results[idx]:
|
|
|
|
|
track_match_cache.insert((spotify_track['id'], search_results[idx].id))
|
|
|
|
|
else:
|
|
|
|
|
color = ('\033[91m', '\033[0m')
|
|
|
|
|
print(color[0] + f"Could not find track {spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + color[1])
|
|
|
|
|
|
|
|
|
|
return search_results
|
|
|
|
|
|
|
|
|
|
def update_tidal_playlist(old_tidal_tracks: Sequence[tidalapi.Track], new_tidal_tracks: List[tidalapi.Track | None], tidal_playlist: tidalapi.UserPlaylist, tidal_session: tidalapi.Session, sync_favorites: bool = False):
|
|
|
|
|
''' Updates the Tidal playlist or favorites with the new tracks '''
|
|
|
|
|
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
|
|
|
|
|
new_tidal_track_ids = [t.id for t in new_tidal_tracks if t]
|
|
|
|
|
|
|
|
|
|
if new_tidal_track_ids == old_tidal_track_ids:
|
|
|
|
|
print("No changes to write to Tidal")
|
|
|
|
|
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
|
|
|
|
|
# Append new tracks to the existing playlist or favorites if possible
|
|
|
|
|
add_multiple_tracks_to_playlist(
|
|
|
|
|
playlist=tidal_playlist,
|
|
|
|
|
session=tidal_session,
|
|
|
|
|
track_ids=new_tidal_track_ids[len(old_tidal_track_ids):],
|
|
|
|
|
sync_favorites=sync_favorites
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
# Erase old playlist or favorites and add new tracks from scratch if any reordering occured
|
|
|
|
|
set_tidal_playlist(
|
|
|
|
|
playlist=tidal_playlist,
|
|
|
|
|
session=tidal_session,
|
|
|
|
|
track_ids=new_tidal_track_ids,
|
|
|
|
|
sync_favorites=sync_favorites
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
|
|
|
|
|
for spotify_playlist, tidal_playlist in playlists:
|
|
|
|
|
asyncio.run(main=sync_playlist(spotify_session=spotify_session, tidal_session=tidal_session, spotify_playlist=spotify_playlist, tidal_playlist=tidal_playlist, config=config) )
|
|
|
|
|
|
|
|
|
|
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
|
|
|
|
|
# Create a new Tidal playlist if required
|
|
|
|
|
if not tidal_playlist:
|
|
|
|
|
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
|
|
|
|
|
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
|
|
|
|
|
|
|
|
|
|
# Extract the new tracks from the playlist that we haven't already seen before
|
|
|
|
|
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
|
|
|
|
|
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session=spotify_session, spotify_playlist=spotify_playlist)
|
|
|
|
|
old_tidal_tracks = tidal_playlist.tracks()
|
|
|
|
|
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
|
|
|
|
|
if not tracks_to_search:
|
|
|
|
|
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
|
|
|
|
|
return
|
|
|
|
|
await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_playlist)
|
|
|
|
|
|
|
|
|
|
# Search for each of the tracks on Tidal concurrently
|
|
|
|
|
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name'])
|
|
|
|
|
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
|
|
|
|
|
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
|
|
|
|
|
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
|
|
|
|
|
rate_limiter_task.cancel()
|
|
|
|
|
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
|
|
|
|
|
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
|
|
|
|
|
|
|
|
|
|
# Add the search results to the cache
|
|
|
|
|
for idx, spotify_track in enumerate(tracks_to_search):
|
|
|
|
|
if search_results[idx]:
|
|
|
|
|
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
|
|
|
|
|
else:
|
|
|
|
|
color = ('\033[91m', '\033[0m')
|
|
|
|
|
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1])
|
|
|
|
|
|
|
|
|
|
# Update the Tidal playlist if there are changes
|
|
|
|
|
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
|
|
|
|
|
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
|
|
|
|
|
if new_tidal_track_ids == old_tidal_track_ids:
|
|
|
|
|
print("No changes to write to Tidal playlist")
|
|
|
|
|
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
|
|
|
|
|
# Append new tracks to the existing playlist if possible
|
|
|
|
|
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
|
|
|
|
|
else:
|
|
|
|
|
# Erase old playlist and add new tracks from scratch if any reordering occured
|
|
|
|
|
set_tidal_playlist(tidal_playlist, new_tidal_track_ids)
|
|
|
|
|
|
|
|
|
|
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
|
|
|
|
|
for spotify_playlist, tidal_playlist in playlists:
|
|
|
|
|
# sync the spotify playlist to tidal
|
|
|
|
|
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
|
|
|
|
|
async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
|
|
|
|
|
spotify_tracks = await get_tracks_from_spotify_favorites(spotify_session=spotify_session)
|
|
|
|
|
old_tidal_tracks = tidal_session.user.favorites.tracks(order='DATE')
|
|
|
|
|
await sync_tracks(spotify_tracks=spotify_tracks, old_tidal_tracks=old_tidal_tracks, config=config, tidal_session=tidal_session, tidal_playlist=tidal_session.user.favorites, sync_favorites=True)
|
|
|
|
|
|
|
|
|
|
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
|
|
|
|
|
if spotify_playlist['name'] in tidal_playlists:
|
|
|
|
|
@@ -333,14 +399,14 @@ def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: t
|
|
|
|
|
def get_playlist_ids(config):
|
|
|
|
|
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
|
|
|
|
|
output = []
|
|
|
|
|
for spotify_id, tidal_id in get_playlist_ids(config):
|
|
|
|
|
for spotify_id, tidal_id in get_playlist_ids(config=config):
|
|
|
|
|
try:
|
|
|
|
|
spotify_playlist = spotify_session.playlist(spotify_id)
|
|
|
|
|
spotify_playlist = spotify_session.playlist(playlist_id=spotify_id)
|
|
|
|
|
except spotipy.SpotifyException as e:
|
|
|
|
|
print(f"Error getting Spotify playlist {spotify_id}")
|
|
|
|
|
raise e
|
|
|
|
|
try:
|
|
|
|
|
tidal_playlist = tidal_session.playlist(tidal_id)
|
|
|
|
|
tidal_playlist = tidal_session.playlist(playlist_id=tidal_id)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error getting Tidal playlist {tidal_id}")
|
|
|
|
|
raise e
|
|
|
|
|
|