18 Commits
0.1.4 ... 1.0.1

Author SHA1 Message Date
Tim Rae
693dcd110f Update version to 1.0.1 2024-09-12 16:59:45 +02:00
Tim Rae
17d60c019d Use user id from spotify session instead of config file
Fixes #80
2024-09-11 19:18:13 +02:00
mannp
7148053ad4 Update pyproject.toml (#79) 2024-09-09 19:03:56 +02:00
Tim Rae
64ea6f3df0 Bump version to 1.0.0
There have been some pretty significant changes recently
so figured it's time for a major version bump
2024-09-09 12:28:32 +02:00
Tim Rae
da1bc2eff9 Use utf-8 encoding for tracks not found 2024-09-09 10:02:22 +02:00
Tim Rae
5e3645319f Use current_user_playlists instead of user_playlists 2024-08-19 07:57:35 +02:00
Tim Rae
af0fd7a2cf Add retry functionality for playlist tracks 2024-08-19 07:57:35 +02:00
som
4f023d024b creates file for songs not found (#74) 2024-08-15 09:58:45 +02:00
Arnav Banerjee
54bb7097a7 fixed failing test (#68) 2024-06-11 20:32:31 +02:00
xerexoded
692939cd0e added circle CI config with basic unit tests 2024-06-10 19:29:13 +02:00
Tim Rae
ced8db44fe Fetch Tidal playlists/tracks asynchronously in chunks 2024-06-10 10:00:57 +02:00
Tim Rae
193e74b775 Never erase tracks from Tidal favorites 2024-06-10 09:32:44 +02:00
Tim Rae
7a1343ac91 Add config param for sync_favorites_default and cleanup command line arg 2024-06-10 09:32:44 +02:00
Tim Rae
6d6a4fe43e Fix favorite tracks getting cutoff at 1000 2024-06-10 09:32:44 +02:00
Tim Rae
1cdc62fd0c Cleanup partitioning of code used in favorites / playlist list 2024-06-10 09:32:44 +02:00
Cobal
54526a0306 feat: Sync favorites tracks to Tidal 2024-06-10 09:32:44 +02:00
Tim Rae
fab154851b Don't cache Tidal tracks that are unavailable
These would generally fail to be added to the Tidal playlist and could
cause playlists to be unnecessarily wiped every time the playlist is
synced
2024-06-09 12:28:52 +02:00
Raphael
b6340790ca Print duplicates that occurred while syncing (#64) 2024-06-09 10:50:10 +02:00
14 changed files with 422 additions and 80 deletions

38
.circleci/config.yml Normal file
View File

@@ -0,0 +1,38 @@
version: 2.1
jobs:
setup:
docker:
- image: circleci/python:3.10
steps:
- checkout
- run:
name: Install dependencies
command: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-mock
test:
docker:
- image: circleci/python:3.10
steps:
- checkout
- run:
name: Install dependencies
command: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-mock
- run:
name: Run tests
command: pytest
workflows:
version: 2
test_workflow:
jobs:
- setup
- test:
requires:
- setup

3
.gitignore vendored
View File

@@ -30,3 +30,6 @@ venv/
.mypy_cache/
.dmypy.json
dmypy.json
# Visual Studio Code
.vscode/

View File

@@ -14,6 +14,11 @@ spotify:
#excluded_playlists:
# - spotify:playlist:1ABCDEqsABCD6EaABCDa0a
# default setting for syncing favorites when no command line arguments are provided
# - when true: favorites will be synced by default (overriden when any command line arg provided)
# - when false: favorites can only be synced manually via --sync-favorites argument
sync_favorites_default: true
# increasing these parameters should increase the search speed, while decreasing reduces likelihood of 429 errors
max_concurrency: 10 # max concurrent connections at any given time
rate_limit: 10 # max sustained connections per second

View File

@@ -4,15 +4,17 @@ build-backend = "setuptools.build_meta"
[project]
name = "spotify_to_tidal"
version = "0.1.4"
version = "1.0.1"
requires-python = ">= 3.10"
dependencies = [
"spotipy~=2.21.0",
"spotipy~=2.24.0",
"tidalapi==0.7.6",
"pyyaml~=6.0",
"tqdm~=4.64",
"sqlalchemy~=2.0"
"sqlalchemy~=2.0",
"pytest~=8.0",
"pytest-mock~=3.8"
]
[tools.setuptools.packages."spotify_to_tidal"]

6
pytest.ini Normal file
View File

@@ -0,0 +1,6 @@
[pytest]
addopts = --maxfail=1 --disable-warnings
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

View File

@@ -30,6 +30,12 @@ You can also just synchronize a specific playlist by doing the following:
spotify_to_tidal --uri 1ABCDEqsABCD6EaABCDa0a # accepts playlist id or full playlist uri
```
or sync just your 'Liked Songs' with:
```bash
spotify_to_tidal --sync-favorites
```
See example_config.yml for more configuration options, and `spotify_to_tidal --help` for more options.
---

View File

@@ -9,6 +9,7 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.yml', help='location of the config file')
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
parser.add_argument('--sync-favorites', action=argparse.BooleanOptionalAction, help='synchronize the favorites')
args = parser.parse_args()
with open(args.config, 'r') as f:
@@ -22,15 +23,23 @@ def main():
if args.uri:
# if a playlist ID is explicitly provided as a command line argument then use that
spotify_playlist = spotify_session.playlist(args.uri)
tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session)
tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session)
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
_sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed
elif args.sync_favorites:
sync_favorites = True # sync only the favorites
elif config.get('sync_playlists', None):
# if the config contains a sync_playlists list of mappings then use that
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
else:
# otherwise just use the user playlists in the Spotify account
_sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
# otherwise sync all the user playlists in the Spotify account and favorites unless explicitly disabled
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
if sync_favorites:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)
if __name__ == '__main__':
main()

View File

@@ -11,9 +11,11 @@ __all__ = [
'open_tidal_session'
]
SPOTIFY_SCOPES = 'playlist-read-private, user-library-read'
def open_spotify_session(config) -> spotipy.Spotify:
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
scope='playlist-read-private',
scope=SPOTIFY_SCOPES,
client_id=config['client_id'],
client_secret=config['client_secret'],
redirect_uri=config['redirect_uri'],

View File

@@ -5,15 +5,16 @@ from .cache import failure_cache, track_match_cache
import datetime
from difflib import SequenceMatcher
from functools import partial
from typing import List, Sequence, Set, Mapping
from typing import Callable, List, Sequence, Set, Mapping
import math
import requests
import sys
import spotipy
import tidalapi
from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist
from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites, get_all_playlists, get_all_playlist_tracks
import time
from tqdm.asyncio import tqdm as atqdm
from tqdm import tqdm
import traceback
import unicodedata
import math
@@ -133,20 +134,11 @@ async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Sess
# if none of the search modes succeeded then store the track id to the failure cache
failure_cache.cache_match_failure(spotify_track['id'])
def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
# a dictionary of name --> playlist
print("Loading Tidal playlists... This may take some time.")
tidal_playlists = tidal_session.user.playlists()
output = {}
for playlist in tidal_playlists:
output[playlist.name] = playlist
return output
async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
# utility to repeat calling the function up to 5 times if an exception is thrown
try:
return await function(*args, **kwargs)
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException) as e:
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e:
if remaining:
print(f"{str(e)} occurred, retrying {remaining} times")
else:
@@ -165,36 +157,46 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
time.sleep(sleep_schedule.get(remaining, 1))
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
output = []
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] )
output.extend([r['track'] for r in results['items'] if r['track'] is not None])
results = fetch_function(0)
output.extend([item['track'] for item in results['items'] if item['track'] is not None])
# get all the remaining tracks in parallel
# Get all the remaining tracks in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] )
offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
extra_results = await atqdm.gather(
*[asyncio.to_thread(fetch_function, offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None])
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])
return output
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
return await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack):
for idx, tidal_track in list(enumerate(tidal_tracks)):
if match(tidal_track, spotify_track):
if tidal_track.available and match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id))
tidal_tracks.pop(idx)
return
def _populate_one_track_from_tidal(tidal_track: tidalapi.Track):
for idx, spotify_track in list(enumerate(spotify_tracks)):
if match(tidal_track, spotify_track):
if tidal_track.available and match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id))
spotify_tracks.pop(idx)
return
@@ -210,9 +212,8 @@ def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack]
for track in spotify_tracks:
_populate_one_track_from_spotify(track)
def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]:
''' Extracts only the tracks that have not already been seen in our Tidal caches '''
results = []
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
@@ -224,15 +225,22 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates '''
output = []
seen_tracks = set()
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
tidal_id = track_match_cache.get(spotify_track['id'])
if tidal_id and not tidal_id in seen_tracks:
output.append(tidal_id)
seen_tracks.add(tidal_id)
if tidal_id:
if tidal_id in seen_tracks:
track_name = spotify_track['name']
artist_names = ', '.join([artist['name'] for artist in spotify_track['artists']])
print(f'Duplicate found: Track "{track_name}" by {artist_names} will be ignored')
else:
output.append(tidal_id)
seen_tracks.add(tidal_id)
return output
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict):
""" Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache """
async def _run_rate_limiter(semaphore):
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
@@ -245,6 +253,35 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
t0 = t
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'
# Extract the new tracks that do not already exist in the old tidal tracklist
tracks_to_search = get_new_spotify_tracks(spotify_tracks)
if not tracks_to_search:
return
# Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name)
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
rate_limiter_task.cancel()
# Add the search results to the cache
song404 = []
for idx, spotify_track in enumerate(tracks_to_search):
if search_results[idx]:
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
else:
song404.append(f"{spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}")
color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find the track " + song404[-1] + color[1])
file_name = "songs not found.txt"
with open(file_name, "a", encoding="utf-8") as file:
for song in song404:
file.write(f"{song}\n")
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict):
""" sync given playlist to tidal """
# Create a new Tidal playlist if required
if not tidal_playlist:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
@@ -252,30 +289,13 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
# Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
if not tracks_to_search:
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
return
# Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name'])
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
rate_limiter_task.cancel()
# Add the search results to the cache
for idx, spotify_track in enumerate(tracks_to_search):
if search_results[idx]:
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
else:
color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1])
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
# Update the Tidal playlist if there are changes
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal playlist")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
@@ -283,13 +303,51 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
else:
# Erase old playlist and add new tracks from scratch if any reordering occured
set_tidal_playlist(tidal_playlist, new_tidal_track_ids)
clear_tidal_playlist(tidal_playlist)
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids)
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict):
""" sync user favorites to tidal """
async def get_tracks_from_spotify_favorites() -> List[dict]:
_get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks(offset=offset)
tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks)
tracks.reverse()
return tracks
def get_new_tidal_favorites() -> List[int]:
existing_favorite_ids = set([track.id for track in old_tidal_tracks])
new_ids = []
for spotify_track in spotify_tracks:
match_id = track_match_cache.get(spotify_track['id'])
if match_id and not match_id in existing_favorite_ids:
new_ids.append(match_id)
return new_ids
print("Loading favorite tracks from Spotify")
spotify_tracks = await get_tracks_from_spotify_favorites()
print("Loading existing favorite tracks from Tidal")
old_tidal_tracks = await get_all_favorites(tidal_session.user.favorites, order='DATE')
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config)
new_tidal_favorite_ids = get_new_tidal_favorites()
if new_tidal_favorite_ids:
for tidal_id in tqdm(new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites"):
tidal_session.user.favorites.add_track(tidal_id)
else:
print("No new tracks to add to Tidal favorites")
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict):
for spotify_playlist, tidal_playlist in playlists:
# sync the spotify playlist to tidal
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
def get_tidal_playlists_wrapper(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user))
return {playlist.name: playlist for playlist in tidal_playlists}
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists:
# if there's an existing tidal playlist with the name of the current playlist then use that
@@ -301,40 +359,45 @@ def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists:
def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
results = []
spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config))
tidal_playlists = get_tidal_playlists_dict(tidal_session)
tidal_playlists = get_tidal_playlists_wrapper(tidal_session)
for spotify_playlist in spotify_playlists:
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
return results
async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
# get all the user playlists from the Spotify account
# get all the playlists from the Spotify account
playlists = []
print("Loading Spotify playlists")
results = spotify_session.user_playlists(config['spotify']['username'])
first_results = spotify_session.current_user_playlists()
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
playlists.extend([p for p in results['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
playlists.extend([p for p in first_results['items']])
user_id = spotify_session.current_user()['id']
# get all the remaining playlists in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.user_playlists, config['spotify']['username'], offset=offset) for offset in offsets ] )
if first_results['next']:
offsets = [ first_results['limit'] * n for n in range(1, math.ceil(first_results['total']/first_results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) for offset in offsets ] )
for extra_result in extra_results:
playlists.extend([p for p in extra_result['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
return playlists
playlists.extend([p for p in extra_result['items']])
# filter out playlists that don't belong to us or are on the exclude list
my_playlist_filter = lambda p: p['owner']['id'] == user_id
exclude_filter = lambda p: not p['id'] in exclude_list
return list(filter( exclude_filter, filter( my_playlist_filter, playlists )))
def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
# get the list of playlist sync mappings from the configuration file
def get_playlist_ids(config):
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
output = []
for spotify_id, tidal_id in get_playlist_ids(config):
for spotify_id, tidal_id in get_playlist_ids(config=config):
try:
spotify_playlist = spotify_session.playlist(spotify_id)
spotify_playlist = spotify_session.playlist(playlist_id=spotify_id)
except spotipy.SpotifyException as e:
print(f"Error getting Spotify playlist {spotify_id}")
raise e
try:
tidal_playlist = tidal_session.playlist(tidal_id)
tidal_playlist = tidal_session.playlist(playlist_id=tidal_id)
except Exception as e:
print(f"Error getting Tidal playlist {tidal_id}")
raise e

View File

@@ -1,6 +1,9 @@
import asyncio
import math
from typing import List
import tidalapi
from tqdm import tqdm
from tqdm.asyncio import tqdm as atqdm
def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]):
headers = {'If-None-Match': playlist._etag}
@@ -24,6 +27,53 @@ def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids:
offset += count
progress.update(count)
def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]):
clear_tidal_playlist(playlist)
add_multiple_tracks_to_playlist(playlist, track_ids)
async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track]:
"""
Helper function to get all items from a Tidal endpoint in parallel
The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead
"""
def _make_request(offset: int=0):
new_params = params
new_params['offset'] = offset
return session.request.map_request(url, params=new_params)
first_chunk_raw = _make_request()
limit = first_chunk_raw['limit']
total = first_chunk_raw['totalNumberOfItems']
items = session.request.map_json(first_chunk_raw, parse=parser)
if len(items) < total:
offsets = [limit * n for n in range(1, math.ceil(total/limit))]
extra_results = await atqdm.gather(
*[asyncio.to_thread(lambda offset: session.request.map_json(_make_request(offset), parse=parser), offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
items.extend(extra_result)
return items
async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC", chunk_size: int=100) -> List[tidalapi.Track]:
""" Get all favorites from Tidal playlist in chunks """
params = {
"limit": chunk_size,
"order": order,
"orderDirection": order_direction,
}
return await _get_all_chunks(f"{favorites.base_url}/tracks", session=favorites.session, parser=favorites.session.parse_track, params=params)
async def get_all_playlists(user: tidalapi.User, chunk_size: int=10) -> List[tidalapi.Playlist]:
""" Get all user playlists from Tidal in chunks """
print(f"Loading playlists from Tidal user")
params = {
"limit": chunk_size,
}
return await _get_all_chunks(f"users/{user.id}/playlists", session=user.session, parser=user.playlist.parse_factory, params=params)
async def get_all_playlist_tracks(playlist: tidalapi.Playlist, chunk_size: int=20) -> List[tidalapi.Track]:
""" Get all tracks from Tidal playlist in chunks """
params = {
"limit": chunk_size,
}
print(f"Loading tracks from Tidal playlist '{playlist.name}'")
return await _get_all_chunks(f"{playlist._base_url%playlist.id}/tracks", session=playlist.session, parser=playlist.session.parse_track, params=params)

5
tests/conftest.py Normal file
View File

@@ -0,0 +1,5 @@
import sys
import os
# Add the src directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))

0
tests/unit/__init__.py Normal file
View File

73
tests/unit/test_auth.py Normal file
View File

@@ -0,0 +1,73 @@
# tests/unit/test_auth.py
import pytest
import spotipy
import tidalapi
import yaml
import sys
from unittest import mock
from spotify_to_tidal.auth import open_spotify_session, open_tidal_session, SPOTIFY_SCOPES
def test_open_spotify_session(mocker):
# Mock the SpotifyOAuth class
mock_spotify_oauth = mocker.patch(
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
)
mock_spotify_instance = mocker.patch(
"spotify_to_tidal.auth.spotipy.Spotify", autospec=True
)
# Define a mock configuration
mock_config = {
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://localhost/",
}
# Create a mock SpotifyOAuth instance
mock_oauth_instance = mock_spotify_oauth.return_value
mock_oauth_instance.get_access_token.return_value = "mock_access_token"
# Call the function under test
spotify_instance = open_spotify_session(mock_config)
# Assert that the SpotifyOAuth was called with correct parameters
mock_spotify_oauth.assert_called_once_with(
username="test_user",
scope=SPOTIFY_SCOPES,
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uri="http://localhost/",
requests_timeout=2,
)
# Assert that the Spotify instance was created
mock_spotify_instance.assert_called_once_with(oauth_manager=mock_oauth_instance)
assert spotify_instance == mock_spotify_instance.return_value
def test_open_spotify_session_oauth_error(mocker):
# Mock the SpotifyOAuth class and simulate an OAuth error
mock_spotify_oauth = mocker.patch(
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
)
mock_spotify_oauth.return_value.get_access_token.side_effect = (
spotipy.SpotifyOauthError("mock error")
)
# Define a mock configuration
mock_config = {
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://localhost/",
}
# Mock sys.exit to prevent the test from exiting
mock_sys_exit = mocker.patch("sys.exit")
# Call the function under test and assert sys.exit is called
open_spotify_session(mock_config)
mock_sys_exit.assert_called_once()

80
tests/unit/test_cache.py Normal file
View File

@@ -0,0 +1,80 @@
# tests/unit/test_cache.py
import pytest
import datetime
import sqlalchemy
from sqlalchemy import create_engine, select
from unittest import mock
from spotify_to_tidal.cache import MatchFailureDatabase, TrackMatchCache
# Setup an in-memory SQLite database for testing
@pytest.fixture
def in_memory_db():
engine = create_engine("sqlite:///:memory:")
return engine
# Test MatchFailureDatabase
def test_cache_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
with failure_db.engine.connect() as connection:
result = connection.execute(
select(failure_db.match_failures).where(
failure_db.match_failures.c.track_id == track_id
)
).fetchone()
assert result is not None
assert result.track_id == track_id
def test_has_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
assert failure_db.has_match_failure(track_id) is True
def test_remove_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
failure_db.remove_match_failure(track_id)
with failure_db.engine.connect() as connection:
result = connection.execute(
select(failure_db.match_failures).where(
failure_db.match_failures.c.track_id == track_id
)
).fetchone()
assert result is None
# Test TrackMatchCache
def test_track_match_cache_insert():
track_cache = TrackMatchCache()
track_cache.insert(("spotify_id", 123))
assert track_cache.get("spotify_id") == 123
def test_track_match_cache_get():
track_cache = TrackMatchCache()
track_cache.insert(("spotify_id", 123))
assert track_cache.get("spotify_id") == 123
assert track_cache.get("nonexistent_id") is None