Performance optimization / refactoring (#43)

This replaces #36 and adds some other fixes!

Execution speed should be much faster now, especially when there are not
many changes to synchronize.

* Maintain track cache between different playlists (thanks to @joshrmcdaniel for amazing work on that!)
* Fix incorrect tidal_playlist_is_dirty() implementation
* Remove more redundant API calls
* Avoid unnecessarily spinning up tasks for tracks that were in match failure cache
* Introduce new rate_limit configuration parameter implemented with leaky bucket rate-limiting algorithm
* Where possible, add new tracks to existing playlist instead of erasing the old ones
* Use asyncio multithreading instead of multiprocessing
* When user has large number of spotify playlists, fetch them in parallel instead of one by one
* More typing hints / typing fixes
This commit is contained in:
Tim Rae
2024-06-03 09:11:56 +02:00
committed by GitHub
parent 689637510d
commit 1e8366a0e8
5 changed files with 207 additions and 165 deletions

View File

@@ -14,6 +14,6 @@ spotify:
#excluded_playlists:
# - spotify:playlist:1ABCDEqsABCD6EaABCDa0a
# number of concurrent subprocesses when searching tracks in a playlist.
# increasing this value can improve sync speed, but may increase 429 errors
subprocesses: 25
# increasing these parameters should increase the search speed, while decreasing reduces likelihood of 429 errors
max_concurrency: 10 # max concurrent connections at any given time
rate_limit: 12 # max sustained connections per second

View File

@@ -27,7 +27,7 @@ def main():
_sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config)
elif config.get('sync_playlists', None):
# if the config contains a sync_playlists list of mappings then use that
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(config), config)
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
else:
# otherwise just use the user playlists in the Spotify account
_sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)

View File

@@ -1,9 +1,15 @@
import datetime
import sqlalchemy
from sqlalchemy import Table, Column, String, DateTime, MetaData, insert, select, update, delete
from typing import Dict, List, Sequence, Set, Mapping
class Database:
class MatchFailureDatabase:
"""
sqlite database of match failures which persists between runs
this can be used concurrently between multiple processes
"""
def __init__(self, filename='.cache.db'):
self.engine = sqlalchemy.create_engine(f"sqlite:///{filename}")
meta = MetaData()
@@ -15,7 +21,7 @@ class Database:
sqlite_autoincrement=False)
meta.create_all(self.engine)
def _get_next_retry_time(self, insert_time=None):
def _get_next_retry_time(self, insert_time: datetime.datetime | None = None) -> datetime.datetime:
if insert_time:
# double interval on each retry
interval = 2 * (datetime.datetime.now() - insert_time)
@@ -23,7 +29,7 @@ class Database:
interval = datetime.timedelta(days=7)
return datetime.datetime.now() + interval
def cache_match_failure(self, track_id):
def cache_match_failure(self, track_id: str):
""" notifies that matching failed for the given track_id """
fetch_statement = select(self.match_failures).where(
self.match_failures.c.track_id == track_id)
@@ -40,7 +46,7 @@ class Database:
connection.execute(insert(self.match_failures), {
"track_id": track_id, "insert_time": datetime.datetime.now(), "next_retry": self._get_next_retry_time()})
def has_match_failure(self, track_id):
def has_match_failure(self, track_id: str) -> bool:
""" checks if there was a recent search for which matching failed with the given track_id """
statement = select(self.match_failures.c.next_retry).where(
self.match_failures.c.track_id == track_id)
@@ -50,7 +56,7 @@ class Database:
return match_failure.next_retry > datetime.datetime.now()
return False
def remove_match_failure(self, track_id):
def remove_match_failure(self, track_id: str):
""" removes match failure from the database """
statement = delete(self.match_failures).where(
self.match_failures.c.track_id == track_id)
@@ -59,5 +65,20 @@ class Database:
connection.execute(statement)
class TrackMatchCache:
"""
Non-persistent mapping of spotify ids -> tidal_ids
This should NOT be accessed concurrently from multiple processes
"""
data: Dict[str, int] = {}
def get(self, track_id: str) -> int | None:
return self.data.get(track_id, None)
def insert(self, mapping: tuple[str, int]):
self.data[mapping[0]] = mapping[1]
# Main singleton instance
failure_cache = Database()
failure_cache = MatchFailureDatabase()
track_match_cache = TrackMatchCache()

View File

@@ -1,23 +1,23 @@
#!/usr/bin/env python3
from .database import failure_cache
import asyncio
from .cache import failure_cache, track_match_cache
from functools import partial
from typing import Sequence, Set, Mapping
from multiprocessing import Pool
from typing import List, Sequence, Set, Mapping
import math
import requests
import sys
import spotipy
import tidalapi
from .tidalapi_patch import set_tidal_playlist
from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist
import time
from tqdm import tqdm
from tqdm.asyncio import tqdm as atqdm
import traceback
import unicodedata
import math
from .type import spotify as t_spotify
def normalize(s) -> str:
return unicodedata.normalize('NFD', s).encode('ascii', 'ignore').decode('ascii')
@@ -30,7 +30,7 @@ def isrc_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
return tidal_track.isrc == spotify_track["external_ids"]["isrc"]
return False
def duration_match(tidal_track: tidalapi.Track, spotify_track, tolerance=2) -> float:
def duration_match(tidal_track: tidalapi.Track, spotify_track, tolerance=2) -> bool:
# the duration of the two tracks must be the same to within 2 seconds
return abs(tidal_track.duration - spotify_track['duration_ms']/1000) < tolerance
@@ -50,7 +50,7 @@ def name_match(tidal_track, spotify_track) -> bool:
simple_spotify_track = simple(spotify_track['name'].lower()).split('feat.')[0].strip()
return simple_spotify_track in tidal_track.name.lower() or normalize(simple_spotify_track) in normalize(tidal_track.name.lower())
def artist_match(tidal_track: tidalapi.Track, spotify_track) -> Set[str]:
def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
def split_artist_name(artist: str) -> Sequence[str]:
if '&' in artist:
return artist.split('&')
@@ -60,7 +60,7 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> Set[str]:
return [artist]
def get_tidal_artists(tidal_track: tidalapi.Track, do_normalize=False) -> Set[str]:
result = []
result: list[str] = []
for artist in tidal_track.artists:
if do_normalize:
artist_name = normalize(artist.name)
@@ -70,7 +70,7 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> Set[str]:
return set([simple(x.strip().lower()) for x in result])
def get_spotify_artists(spotify_track: t_spotify.SpotifyTrack, do_normalize=False) -> Set[str]:
result = []
result: list[str] = []
for artist in spotify_track['artists']:
if do_normalize:
artist_name = normalize(artist['name'])
@@ -91,27 +91,35 @@ def match(tidal_track, spotify_track) -> bool:
and artist_match(tidal_track, spotify_track)
)
def tidal_search(spotify_track_and_cache, tidal_session: tidalapi.Session) -> tidalapi.Track | None:
spotify_track, cached_tidal_track = spotify_track_and_cache
if cached_tidal_track: return cached_tidal_track
if spotify_track['id'] is None: return None
if failure_cache.has_match_failure(spotify_track['id']):
return None
# search for album name and first album artist
if 'album' in spotify_track and 'artists' in spotify_track['album'] and len(spotify_track['album']['artists']):
album_result = tidal_session.search(simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name']), models=[tidalapi.album.Album])
for album in album_result['albums']:
album_tracks = album.tracks()
if len(album_tracks) >= spotify_track['track_number']:
track = album_tracks[spotify_track['track_number'] - 1]
if match(track, spotify_track):
failure_cache.remove_match_failure(spotify_track['id'])
return track
# if that fails then search for track name and first artist
for track in tidal_session.search(simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name']), models=[tidalapi.media.Track])['tracks']:
if match(track, spotify_track):
failure_cache.remove_match_failure(spotify_track['id'])
return track
async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Session) -> tidalapi.Track | None:
def _search_for_track_in_album():
# search for album name and first album artist
if 'album' in spotify_track and 'artists' in spotify_track['album'] and len(spotify_track['album']['artists']):
album_result = tidal_session.search(simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name']), models=[tidalapi.album.Album])
for album in album_result['albums']:
album_tracks = album.tracks()
if len(album_tracks) >= spotify_track['track_number']:
track = album_tracks[spotify_track['track_number'] - 1]
if match(track, spotify_track):
failure_cache.remove_match_failure(spotify_track['id'])
return track
def _search_for_standalone_track():
# if album search fails then search for track name and first artist
for track in tidal_session.search(simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name']), models=[tidalapi.media.Track])['tracks']:
if match(track, spotify_track):
failure_cache.remove_match_failure(spotify_track['id'])
return track
await rate_limiter.acquire()
album_search = await asyncio.to_thread( _search_for_track_in_album )
if album_search:
return album_search
await rate_limiter.acquire()
track_search = await asyncio.to_thread( _search_for_standalone_track )
if track_search:
return track_search
return None
# if none of the search modes succeeded then store the track id to the failure cache
failure_cache.cache_match_failure(spotify_track['id'])
def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
@@ -123,17 +131,17 @@ def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, ti
output[playlist.name] = playlist
return output
def repeat_on_request_error(function, *args, remaining=5, **kwargs):
async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
# utility to repeat calling the function up to 5 times if an exception is thrown
try:
return function(*args, **kwargs)
except requests.exceptions.RequestException as e:
return await function(*args, **kwargs)
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException) as e:
if remaining:
print(f"{str(e)} occurred, retrying {remaining} times")
else:
print(f"{str(e)} could not be recovered")
if not e.response is None:
if isinstance(e, requests.exceptions.RequestException) and not e.response is None:
print(f"Response message: {e.response.text}")
print(f"Response headers: {e.response.headers}")
@@ -144,27 +152,13 @@ def repeat_on_request_error(function, *args, remaining=5, **kwargs):
sys.exit(1)
sleep_schedule = {5: 1, 4:10, 3:60, 2:5*60, 1:10*60} # sleep variable length of time depending on retry number
time.sleep(sleep_schedule.get(remaining, 1))
return repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
def _enumerate_wrapper(value_tuple, function, **kwargs):
# just a wrapper which accepts a tuple from enumerate and returns the index back as the first argument
index, value = value_tuple
return (index, repeat_on_request_error(function, value, **kwargs))
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist: t_spotify.SpotifyPlaylist):
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
def call_async_with_progress(function, values, description, num_processes, **kwargs):
results = len(values)*[None]
with Pool(processes=num_processes) as process_pool:
for index, result in tqdm(process_pool.imap_unordered(partial(_enumerate_wrapper, function=function, **kwargs),
enumerate(values)), total=len(values), desc=description):
results[index] = result
return results
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
""" implementation function for use with multiprocessing module """
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
output = []
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] )
@@ -173,133 +167,158 @@ def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_p
# get all the remaining tracks in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
extra_results = call_async_with_progress(_get_tracks_from_spotify_playlist, offsets, "",
min(len(offsets), 10), spotify_session=spotify_session, playlist_id=spotify_playlist["id"])
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] )
for extra_result in extra_results:
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None])
return output
class TidalPlaylistCache:
def __init__(self, playlist: tidalapi.Playlist):
self._data = playlist.tracks()
def _search(self, spotify_track: t_spotify.SpotifyTrack):
''' check if the given spotify track was already in the tidal playlist.'''
results = []
for tidal_track in self._data:
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack):
for idx, tidal_track in list(enumerate(tidal_tracks)):
if match(tidal_track, spotify_track):
return tidal_track
return None
track_match_cache.insert((spotify_track['id'], tidal_track.id))
tidal_tracks.pop(idx)
return
def search(self, spotify_session: spotipy.Spotify, spotify_playlist):
''' Add the cached tidal track where applicable to a list of spotify tracks '''
results = []
cache_hits = 0
work_to_do = False
spotify_tracks = get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
for track in spotify_tracks:
cached_track = self._search(track)
if cached_track:
results.append( (track, cached_track) )
cache_hits += 1
else:
results.append( (track, None) )
return (results, cache_hits)
def _populate_one_track_from_tidal(tidal_track: tidalapi.Track):
for idx, spotify_track in list(enumerate(spotify_tracks)):
if match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id))
spotify_tracks.pop(idx)
return
def tidal_playlist_is_dirty(playlist: tidalapi.Playlist, new_track_ids: Sequence[str]) -> bool:
old_tracks = playlist.tracks()
if len(old_tracks) != len(new_track_ids):
return True
for i in range(len(old_tracks)):
if old_tracks[i].id != new_track_ids[i]:
return True
return False
# make a copy of the tracks to avoid modifying original arrays
spotify_tracks = [t for t in spotify_tracks_]
tidal_tracks = [t for t in tidal_tracks_]
def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_id: str, tidal_id: int, config):
try:
spotify_playlist = spotify_session.playlist(spotify_id)
except spotipy.SpotifyException as e:
print("Error getting Spotify playlist " + spotify_id)
print(e)
return
if tidal_id:
# if a Tidal playlist was specified then look it up
try:
tidal_playlist = tidal_session.playlist(tidal_id)
except Exception as e:
print("Error getting Tidal playlist " + tidal_id)
print(e)
return
else:
# create a new Tidal playlist if required
# first populate from the tidal tracks
for track in tidal_tracks:
_populate_one_track_from_tidal(track)
# then populate from the subset of Spotify tracks that didn't match (to account for many-to-one style mappings)
for track in spotify_tracks:
_populate_one_track_from_spotify(track)
def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
results = []
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']):
results.append(spotify_track)
return results
def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> Sequence[int]:
''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates '''
output = []
seen_tracks = set()
for spotify_track in spotify_tracks:
tidal_id = track_match_cache.get(spotify_track['id'])
if tidal_id and not tidal_id in seen_tracks:
output.append(tidal_id)
seen_tracks.add(tidal_id)
return output
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
async def _run_rate_limiter(semaphore):
''' Leaky bucket algorithm for rate limiting. Periodically releases an item from semaphore at rate_limit'''
while True:
await asyncio.sleep(1/config.get('rate_limit', 12)) # sleep for min time between new function executions
semaphore.release() # leak one item from the 'bucket'
# Create a new Tidal playlist if required
if not tidal_playlist:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
tidal_track_ids = []
spotify_tracks, cache_hits = TidalPlaylistCache(tidal_playlist).search(spotify_session, spotify_playlist)
if cache_hits == len(spotify_tracks):
# Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
if not tracks_to_search:
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
return
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(spotify_tracks) - cache_hits, len(spotify_tracks), spotify_playlist['name'])
tidal_tracks = call_async_with_progress(tidal_search, spotify_tracks, task_description, config.get('subprocesses', 25), tidal_session=tidal_session)
for index, tidal_track in enumerate(tidal_tracks):
spotify_track = spotify_tracks[index][0]
if tidal_track:
tidal_track_ids.append(tidal_track.id)
# Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name'])
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
rate_limiter_task.cancel()
# Add the search results to the cache
for idx, spotify_track in enumerate(tracks_to_search):
if search_results[idx]:
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
else:
color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1])
if tidal_playlist_is_dirty(tidal_playlist, tidal_track_ids):
set_tidal_playlist(tidal_playlist, tidal_track_ids)
else:
# Update the Tidal playlist if there are changes
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal playlist")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
# Append new tracks to the existing playlist if possible
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
else:
# Erase old playlist and add new tracks from scratch if any reordering occured
set_tidal_playlist(tidal_playlist, new_tidal_track_ids)
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists: Mapping[str, tidalapi.Playlist], config):
results = []
for spotify_id, tidal_id in playlists:
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
for spotify_playlist, tidal_playlist in playlists:
# sync the spotify playlist to tidal
repeat_on_request_error(sync_playlist, spotify_session, tidal_session, spotify_id, tidal_id, config)
results.append(tidal_id)
return results
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists:
# if there's an existing tidal playlist with the name of the current playlist then use that
tidal_playlist = tidal_playlists[spotify_playlist['name']]
return (spotify_playlist['id'], tidal_playlist.id)
return (spotify_playlist, tidal_playlist)
else:
return (spotify_playlist['id'], None)
return (spotify_playlist, None)
def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
results = []
spotify_playlists = get_playlists_from_spotify(spotify_session, config)
tidal_playlists = get_tidal_playlists_dict(tidal_session)
for spotify_playlist in spotify_playlists:
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
return results
results = []
spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config))
tidal_playlists = get_tidal_playlists_dict(tidal_session)
for spotify_playlist in spotify_playlists:
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
return results
def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
# get all the user playlists from the Spotify account
playlists = []
with tqdm(total=1.0) as pbar:
pbar.set_description("Loading Spotify playlists")
spotify_results = spotify_session.user_playlists(config['spotify']['username'])
total = spotify_results['total']
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
while True:
pbar.update(len(spotify_results['items'])/total)
for spotify_playlist in spotify_results['items']:
if spotify_playlist['owner']['id'] == config['spotify']['username'] and not spotify_playlist['id'] in exclude_list:
playlists.append(spotify_playlist)
# move to the next page of results if there are still playlists remaining
if spotify_results['next']:
spotify_results = spotify_session.next(spotify_results)
else:
break
print("Loading Spotify playlists")
results = spotify_session.user_playlists(config['spotify']['username'])
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
# get all the remaining playlists in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.user_playlists, config['spotify']['username'], offset=offset) for offset in offsets ] )
for extra_result in extra_results:
playlists.extend([p for p in extra_result['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
return playlists
def get_playlists_from_config(config):
def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
# get the list of playlist sync mappings from the configuration file
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
def get_playlist_ids(config):
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
output = []
for spotify_id, tidal_id in get_playlist_ids(config):
try:
spotify_playlist = spotify_session.playlist(spotify_id)
except spotipy.SpotifyException as e:
print(f"Error getting Spotify playlist {spotify_id}")
raise e
try:
tidal_playlist = tidal_session.playlist(tidal_id)
except Exception as e:
print(f"Error getting Tidal playlist {tidal_id}")
raise e
output.append((spotify_playlist, tidal_playlist))
return output

View File

@@ -1,19 +1,21 @@
from typing import List
import tidalapi
from tqdm import tqdm
def _remove_indices_from_playlist(playlist, indices):
def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]):
headers = {'If-None-Match': playlist._etag}
index_string = ",".join(map(str, indices))
playlist.request.request('DELETE', (playlist._base_url + '/items/%s') % (playlist.id, index_string), headers=headers)
playlist._reparse()
def clear_tidal_playlist(playlist, chunk_size=20):
def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int=20):
with tqdm(desc="Erasing existing tracks from Tidal playlist", total=playlist.num_tracks) as progress:
while playlist.num_tracks:
indices = range(min(playlist.num_tracks, chunk_size))
_remove_indices_from_playlist(playlist, indices)
progress.update(len(indices))
def add_multiple_tracks_to_playlist(playlist, track_ids, chunk_size=20):
def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids: List[int], chunk_size: int=20):
offset = 0
with tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids)) as progress:
while offset < len(track_ids):
@@ -22,6 +24,6 @@ def add_multiple_tracks_to_playlist(playlist, track_ids, chunk_size=20):
offset += count
progress.update(count)
def set_tidal_playlist(playlist, track_ids):
def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]):
clear_tidal_playlist(playlist)
add_multiple_tracks_to_playlist(playlist, track_ids)