Various refactoring, improvements, fixes

This commit is contained in:
Timothy Rae
2022-02-10 13:16:00 +13:00
parent e2ead013ad
commit f2e5e202c7
4 changed files with 144 additions and 90 deletions

View File

@@ -19,14 +19,17 @@ def open_spotify_session(config):
return spotipy.Spotify(oauth_manager=credentials_manager) return spotipy.Spotify(oauth_manager=credentials_manager)
def open_tidal_session(): def open_tidal_session(config = None):
try: try:
with open('.session.yml', 'r') as session_file: with open('.session.yml', 'r') as session_file:
previous_session = yaml.safe_load(session_file) previous_session = yaml.safe_load(session_file)
except OSError: except OSError:
previous_session = None previous_session = None
session = tidalapi.Session() if config:
session = tidalapi.Session(config=config)
else:
session = tidalapi.Session()
if previous_session: if previous_session:
try: try:
if session.load_oauth_session(previous_session['session_id'], if session.load_oauth_session(previous_session['session_id'],

View File

@@ -1,5 +1,4 @@
spotipy==2.10.0 spotipy==2.19.0
tidalapi==0.6.9 tidalapi==0.6.10
pyyaml==5.3.1 pyyaml==5.3.1
tqdm==4.45.0 tqdm==4.45.0
unidecode==1.2.0

149
sync.py
View File

@@ -7,16 +7,19 @@ from multiprocessing import Pool
import sys import sys
import spotipy import spotipy
import tidalapi import tidalapi
from tidalapi_patch import create_tidal_playlist, set_tidal_playlist
import time import time
from tqdm import tqdm from tqdm import tqdm
from unidecode import unidecode import unicodedata
from urllib.parse import urljoin
import webbrowser import webbrowser
import yaml import yaml
def normalize(s):
return unicodedata.normalize('NFD', s).encode('ascii', 'ignore').decode('ascii')
def simple(input_string): def simple(input_string):
# only take the first part of a string before any hyphens or brackets to account for different versions # only take the first part of a string before any hyphens or brackets to account for different versions
return unidecode(input_string).split('-')[0].strip().split('(')[0].strip().split('[')[0].strip() return input_string.split('-')[0].strip().split('(')[0].strip().split('[')[0].strip()
def duration_match(tidal_track, spotify_track, tolerance=2): def duration_match(tidal_track, spotify_track, tolerance=2):
# the duration of the two tracks must be the same to within 2 seconds # the duration of the two tracks must be the same to within 2 seconds
@@ -34,8 +37,9 @@ def name_match(tidal_track, spotify_track):
if exclusion_rule("remix", tidal_track, spotify_track): return False if exclusion_rule("remix", tidal_track, spotify_track): return False
# the simplified version of the Spotify track name must be a substring of the Tidal track name # the simplified version of the Spotify track name must be a substring of the Tidal track name
# Try with both un-normalized and then normalized
simple_spotify_track = simple(spotify_track['name'].lower()).split('feat.')[0].strip() simple_spotify_track = simple(spotify_track['name'].lower()).split('feat.')[0].strip()
return simple_spotify_track in tidal_track.name.lower() return simple_spotify_track in tidal_track.name.lower() or normalize(simple_spotify_track) in normalize(tidal_track.name.lower())
def artist_match(tidal_track, spotify_track): def artist_match(tidal_track, spotify_track):
def split_artist_name(artist): def split_artist_name(artist):
@@ -46,19 +50,30 @@ def artist_match(tidal_track, spotify_track):
else: else:
return [artist] return [artist]
def get_tidal_artists(tidal_track): def get_tidal_artists(tidal_track, do_normalize=False):
result = [] result = []
for artist in tidal_track.artists: for artist in tidal_track.artists:
result.extend(split_artist_name(artist.name)) if do_normalize:
artist_name = normalize(artist.name)
else:
artist_name = artist.name
result.extend(split_artist_name(artist_name))
return set([simple(x.strip().lower()) for x in result]) return set([simple(x.strip().lower()) for x in result])
def get_spotify_artists(spotify_track): def get_spotify_artists(spotify_track, do_normalize=False):
result = [] result = []
for artist in spotify_track['artists']: for artist in spotify_track['artists']:
result.extend(split_artist_name(artist['name'])) if do_normalize:
artist_name = normalize(artist['name'])
else:
artist_name = artist['name']
result.extend(split_artist_name(artist_name))
return set([simple(x.strip().lower()) for x in result]) return set([simple(x.strip().lower()) for x in result])
# There must be at least one overlapping artist between the Tidal and Spotify track # There must be at least one overlapping artist between the Tidal and Spotify track
return get_tidal_artists(tidal_track).intersection(get_spotify_artists(spotify_track)) != set() # Try with both un-normalized and then normalized
if get_tidal_artists(tidal_track).intersection(get_spotify_artists(spotify_track)) != set():
return True
return get_tidal_artists(tidal_track, True).intersection(get_spotify_artists(spotify_track, True)) != set()
def match(tidal_track, spotify_track): def match(tidal_track, spotify_track):
return duration_match(tidal_track, spotify_track) and name_match(tidal_track, spotify_track) and artist_match(tidal_track, spotify_track) return duration_match(tidal_track, spotify_track) and name_match(tidal_track, spotify_track) and artist_match(tidal_track, spotify_track)
@@ -85,52 +100,6 @@ def get_tidal_playlists_dict(tidal_session):
tidal_playlists = tidal_session.get_user_playlists(tidal_session.user.id) tidal_playlists = tidal_session.get_user_playlists(tidal_session.user.id)
return {playlist.name: playlist for playlist in tidal_playlists} return {playlist.name: playlist for playlist in tidal_playlists}
def set_tidal_playlist(session, playlist_id, track_ids):
# erases any items in the given playlist, then adds all of the tracks given in track_ids
# had to hack this together because the API doesn't include it
chunk_size = 20 # add/delete tracks in chunks of no more than this many tracks
request_params = {
'sessionId': session.session_id,
'countryCode': session.country_code,
'limit': '999',
}
def get_headers():
etag = session.request('GET','playlists/%s/tracks' % playlist_id).headers['ETag']
return {'if-none-match' : etag}
# clear all old items from playlist
playlist = session.get_playlist(playlist_id)
progress = tqdm(desc="Erasing existing tracks from Tidal playlist", total=playlist.num_tracks)
while True:
if not playlist.num_tracks:
break
track_index_string = ",".join([str(x) for x in range(min(chunk_size, playlist.num_tracks))])
result = session.request('DELETE', 'playlists/{}/tracks/{}'.format(playlist.id, track_index_string), params=request_params, headers=get_headers())
result.raise_for_status()
progress.update(min(chunk_size, playlist.num_tracks))
playlist = session.get_playlist(playlist_id)
progress.close()
# add all new items to the playlist
offset = 0
progress = tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids))
while offset < len(track_ids):
count = min(chunk_size, len(track_ids) - offset)
data = {
'trackIds' : ",".join([str(x) for x in track_ids[offset:offset+chunk_size]]),
'toIndex' : offset
}
offset += count
result = session.request('POST', 'playlists/{}/tracks'.format(playlist.id), params=request_params, data=data, headers=get_headers())
result.raise_for_status()
progress.update(count)
progress.close()
def create_tidal_playlist(session, name):
result = session.request('POST','users/%s/playlists' % session.user.id ,data={'title': name})
return session.get_playlist(result.json()['uuid'])
def repeat_on_exception(function, *args, remaining=5, **kwargs): def repeat_on_exception(function, *args, remaining=5, **kwargs):
# utility to repeat calling the function up to 5 times if an exception is thrown # utility to repeat calling the function up to 5 times if an exception is thrown
try: try:
@@ -205,7 +174,25 @@ def tidal_playlist_is_dirty(tidal_session, playlist_id, new_track_ids):
return True return True
return False return False
def sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config): def sync_playlist(spotify_session, tidal_session, spotify_id, tidal_id, config):
try:
spotify_playlist = spotify_session.playlist(spotify_id)
except spotipy.SpotifyException as e:
print("Error getting Spotify playlist " + spotify_id)
print(e)
results.append(None)
return
if tidal_id:
# if a Tidal playlist was specified then look it up
try:
tidal_playlist = tidal_session.get_playlist(tidal_id)
except exception:
print("Error getting Tidal playlist " + tidal_id)
print(e)
return
else:
# create a new Tidal playlist if required
tidal_playlist = create_tidal_playlist(tidal_session, spotify_playlist['name'])
tidal_track_ids = [] tidal_track_ids = []
spotify_tracks, cache_hits = TidalPlaylistCache(tidal_playlist, tidal_session).search(spotify_session, spotify_playlist) spotify_tracks, cache_hits = TidalPlaylistCache(tidal_playlist, tidal_session).search(spotify_session, spotify_playlist)
if cache_hits == len(spotify_tracks): if cache_hits == len(spotify_tracks):
@@ -228,33 +215,25 @@ def sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playli
print("No changes to write to Tidal playlist") print("No changes to write to Tidal playlist")
def sync_list(spotify_session, tidal_session, playlists, config): def sync_list(spotify_session, tidal_session, playlists, config):
results = [] results = []
tidal_playlists = get_tidal_playlists_dict(tidal_session) for spotify_id, tidal_id in playlists:
for spotify_id, tidal_id in playlists: # sync the spotify playlist to tidal
try: repeat_on_exception(sync_playlist, spotify_session, tidal_session, spotify_id, tidal_id, config)
spotify_playlist = spotify_session.playlist(spotify_id) results.append(tidal_id)
except spotipy.SpotifyException as e: return results
print("Error getting Spotify playlist " + spotify_id)
print(e) def get_user_playlist_mappings(spotify_session, tidal_session, config):
results.append(None) results = []
continue spotify_playlists = get_playlists_from_spotify(spotify_session, config)
if tidal_id: tidal_playlists = get_tidal_playlists_dict(tidal_session)
# if the user manually specified the id of a Tidal playlist to use then favour that for spotify_playlist in spotify_playlists:
try: if spotify_playlist['name'] in tidal_playlists:
tidal_playlist = tidal_session.get_playlist(tidal_id) # if there's an existing tidal playlist with the name of the current playlist then use that
except exception: tidal_playlist = tidal_playlists[spotify_playlist['name']]
print("Error getting Tidal playlist " + tidal_id) results.append((spotify_playlist['id'], tidal_playlist.id))
print(e) else:
continue results.append((spotify_playlist['id'], None))
elif spotify_playlist['name'] in tidal_playlists: return results
# if there's an existing tidal playlist with the name of the current playlist then use that
tidal_playlist = tidal_playlists[spotify_playlist['name']]
else:
# otherwise create a new playlist
tidal_playlist = create_tidal_playlist(tidal_session, spotify_playlist['name'])
repeat_on_exception(sync_playlist, spotify_session, tidal_session, spotify_playlist, tidal_playlist, config)
results.append(tidal_playlist)
return results
def get_playlists_from_spotify(spotify_session, config): def get_playlists_from_spotify(spotify_session, config):
# get all the user playlists from the Spotify account # get all the user playlists from the Spotify account
@@ -264,7 +243,7 @@ def get_playlists_from_spotify(spotify_session, config):
while True: while True:
for spotify_playlist in spotify_results['items']: for spotify_playlist in spotify_results['items']:
if spotify_playlist['owner']['id'] == config['spotify']['username'] and not spotify_playlist['id'] in exclude_list: if spotify_playlist['owner']['id'] == config['spotify']['username'] and not spotify_playlist['id'] in exclude_list:
playlists.append((spotify_playlist['id'], None)) playlists.append(spotify_playlist)
# move to the next page of results if there are still playlists remaining # move to the next page of results if there are still playlists remaining
if spotify_results['next']: if spotify_results['next']:
spotify_results = spotify_session.next(spotify_results) spotify_results = spotify_session.next(spotify_results)
@@ -296,4 +275,4 @@ if __name__ == '__main__':
sync_list(spotify_session, tidal_session, get_playlists_from_config(config), config) sync_list(spotify_session, tidal_session, get_playlists_from_config(config), config)
else: else:
# otherwise just use the user playlists in the Spotify account # otherwise just use the user playlists in the Spotify account
sync_list(spotify_session, tidal_session, get_playlists_from_spotify(spotify_session, config), config) sync_list(spotify_session, tidal_session, get_user_playlist_mappings(spotify_session, tidal_session, config), config)

73
tidalapi_patch.py Normal file
View File

@@ -0,0 +1,73 @@
import tidalapi
from tqdm import tqdm
tidalapi_parse_album = tidalapi._parse_album
def patch():
tidalapi._parse_album = _parse_album
tidalapi.models.Album.picture = picture
def _parse_album(json_obj, artist=None, artists=None):
obj = tidalapi_parse_album(json_obj, artist, artists)
image_id = ""
if json_obj.get("cover"):
image_id = json_obj.get("cover")
obj.__dict__.update(image_id=image_id)
return obj
def picture(obj, width, height):
return "https://resources.tidal.com/images/{image_id}/{width}x{height}.jpg".format(
image_id=obj.image_id.replace("-", "/"), width=width, height=height
)
def set_tidal_playlist(session, playlist_id, track_ids):
# erases any items in the given playlist, then adds all of the tracks given in track_ids
# had to hack this together because the API doesn't include it
chunk_size = 20 # add/delete tracks in chunks of no more than this many tracks
request_params = {
'sessionId': session.session_id,
'countryCode': session.country_code,
'limit': '999',
}
def get_headers():
etag = session.request('GET','playlists/%s/tracks' % playlist_id).headers['ETag']
return {'if-none-match' : etag}
# clear all old items from playlist
playlist = session.get_playlist(playlist_id)
progress = tqdm(desc="Erasing existing tracks from Tidal playlist", total=playlist.num_tracks)
while True:
if not playlist.num_tracks:
break
track_index_string = ",".join([str(x) for x in range(min(chunk_size, playlist.num_tracks))])
result = session.request('DELETE', 'playlists/{}/tracks/{}'.format(playlist.id, track_index_string), params=request_params, headers=get_headers())
result.raise_for_status()
progress.update(min(chunk_size, playlist.num_tracks))
playlist = session.get_playlist(playlist_id)
progress.close()
# add all new items to the playlist
offset = 0
progress = tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids))
while offset < len(track_ids):
count = min(chunk_size, len(track_ids) - offset)
data = {
'trackIds' : ",".join([str(x) for x in track_ids[offset:offset+chunk_size]]),
'toIndex' : offset
}
offset += count
result = session.request('POST', 'playlists/{}/tracks'.format(playlist.id), params=request_params, data=data, headers=get_headers())
result.raise_for_status()
progress.update(count)
progress.close()
def create_tidal_playlist(session, name):
result = session.request('POST','users/%s/playlists' % session.user.id ,data={'title': name})
return session.get_playlist(result.json()['uuid'])