Compare commits
38 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d90c5bac41 | ||
|
|
e62a8a80cb | ||
|
|
a5afd975f0 | ||
|
|
03e0396ac0 | ||
|
|
a438cda72b | ||
|
|
2f1985a42b | ||
|
|
bcf2bbca0d | ||
|
|
457da1724f | ||
|
|
4d7c3b0ef0 | ||
|
|
4fb702d008 | ||
|
|
693dcd110f | ||
|
|
17d60c019d | ||
|
|
7148053ad4 | ||
|
|
64ea6f3df0 | ||
|
|
da1bc2eff9 | ||
|
|
5e3645319f | ||
|
|
af0fd7a2cf | ||
|
|
4f023d024b | ||
|
|
54bb7097a7 | ||
|
|
692939cd0e | ||
|
|
ced8db44fe | ||
|
|
193e74b775 | ||
|
|
7a1343ac91 | ||
|
|
6d6a4fe43e | ||
|
|
1cdc62fd0c | ||
|
|
54526a0306 | ||
|
|
fab154851b | ||
|
|
b6340790ca | ||
|
|
8692624a8c | ||
|
|
dc959f2657 | ||
|
|
d9312d89dd | ||
|
|
1a7ff4f083 | ||
|
|
0c859cc9aa | ||
|
|
bb0f3cffd0 | ||
|
|
ecc642ba7d | ||
|
|
3e9b2ef0ec | ||
|
|
a16f764bee | ||
|
|
c1956d19cc |
38
.circleci/config.yml
Normal file
38
.circleci/config.yml
Normal file
@@ -0,0 +1,38 @@
|
||||
version: 2.1
|
||||
|
||||
jobs:
|
||||
setup:
|
||||
docker:
|
||||
- image: circleci/python:3.10
|
||||
steps:
|
||||
- checkout
|
||||
- run:
|
||||
name: Install dependencies
|
||||
command: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -e .
|
||||
pip install pytest pytest-mock
|
||||
|
||||
test:
|
||||
docker:
|
||||
- image: circleci/python:3.10
|
||||
steps:
|
||||
- checkout
|
||||
- run:
|
||||
name: Install dependencies
|
||||
command: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -e .
|
||||
pip install pytest pytest-mock
|
||||
- run:
|
||||
name: Run tests
|
||||
command: pytest
|
||||
|
||||
workflows:
|
||||
version: 2
|
||||
test_workflow:
|
||||
jobs:
|
||||
- setup
|
||||
- test:
|
||||
requires:
|
||||
- setup
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -30,3 +30,6 @@ venv/
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Visual Studio Code
|
||||
.vscode/
|
||||
|
||||
@@ -2,7 +2,8 @@ spotify:
|
||||
client_id: your_client_id
|
||||
client_secret: your_client_secret
|
||||
username: your_spotify_username
|
||||
redirect_uri: http://localhost:8888/callback
|
||||
redirect_uri: http://127.0.0.1:8888/callback
|
||||
open_browser: True # Set to False if using a headless server environment
|
||||
|
||||
|
||||
# uncomment this block if you want to only sync specific playlist IDs
|
||||
@@ -14,6 +15,11 @@ spotify:
|
||||
#excluded_playlists:
|
||||
# - spotify:playlist:1ABCDEqsABCD6EaABCDa0a
|
||||
|
||||
# default setting for syncing favorites when no command line arguments are provided
|
||||
# - when true: favorites will be synced by default (overriden when any command line arg provided)
|
||||
# - when false: favorites can only be synced manually via --sync-favorites argument
|
||||
sync_favorites_default: true
|
||||
|
||||
# increasing these parameters should increase the search speed, while decreasing reduces likelihood of 429 errors
|
||||
max_concurrency: 10 # max concurrent connections at any given time
|
||||
rate_limit: 12 # max sustained connections per second
|
||||
rate_limit: 10 # max sustained connections per second
|
||||
|
||||
@@ -4,15 +4,17 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "spotify_to_tidal"
|
||||
version = "0.1.1"
|
||||
version = "1.0.4"
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
dependencies = [
|
||||
"spotipy~=2.21.0",
|
||||
"tidalapi==0.7.6",
|
||||
"spotipy~=2.24.0",
|
||||
"tidalapi==0.8.8",
|
||||
"pyyaml~=6.0",
|
||||
"tqdm~=4.64",
|
||||
"sqlalchemy~=2.0"
|
||||
"sqlalchemy~=2.0",
|
||||
"pytest~=8.0",
|
||||
"pytest-mock~=3.8"
|
||||
]
|
||||
|
||||
[tools.setuptools.packages."spotify_to_tidal"]
|
||||
|
||||
6
pytest.ini
Normal file
6
pytest.ini
Normal file
@@ -0,0 +1,6 @@
|
||||
[pytest]
|
||||
addopts = --maxfail=1 --disable-warnings
|
||||
testpaths = tests
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
@@ -1,4 +1,4 @@
|
||||
A command line tool for importing your Spotify playlists into Tidal
|
||||
A command line tool for importing your Spotify playlists into Tidal. Due to various performance optimisations, it is particularly suited for periodic synchronisation of very large collections.
|
||||
|
||||
Installation
|
||||
-----------
|
||||
@@ -19,6 +19,7 @@ Setup
|
||||
Usage
|
||||
----
|
||||
To synchronize all of your Spotify playlists with your Tidal account run the following from the project root directory
|
||||
Windows ignores python module paths by default, but you can run them using `python3 -m spotify_to_tidal`
|
||||
|
||||
```bash
|
||||
spotify_to_tidal
|
||||
@@ -30,6 +31,12 @@ You can also just synchronize a specific playlist by doing the following:
|
||||
spotify_to_tidal --uri 1ABCDEqsABCD6EaABCDa0a # accepts playlist id or full playlist uri
|
||||
```
|
||||
|
||||
or sync just your 'Liked Songs' with:
|
||||
|
||||
```bash
|
||||
spotify_to_tidal --sync-favorites
|
||||
```
|
||||
|
||||
See example_config.yml for more configuration options, and `spotify_to_tidal --help` for more options.
|
||||
|
||||
---
|
||||
|
||||
@@ -9,6 +9,7 @@ def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--config', default='config.yml', help='location of the config file')
|
||||
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
|
||||
parser.add_argument('--sync-favorites', action=argparse.BooleanOptionalAction, help='synchronize the favorites')
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, 'r') as f:
|
||||
@@ -22,15 +23,23 @@ def main():
|
||||
if args.uri:
|
||||
# if a playlist ID is explicitly provided as a command line argument then use that
|
||||
spotify_playlist = spotify_session.playlist(args.uri)
|
||||
tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session)
|
||||
tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session)
|
||||
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
|
||||
_sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config)
|
||||
_sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
|
||||
sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed
|
||||
elif args.sync_favorites:
|
||||
sync_favorites = True # sync only the favorites
|
||||
elif config.get('sync_playlists', None):
|
||||
# if the config contains a sync_playlists list of mappings then use that
|
||||
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
|
||||
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
|
||||
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
|
||||
else:
|
||||
# otherwise just use the user playlists in the Spotify account
|
||||
_sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
|
||||
# otherwise sync all the user playlists in the Spotify account and favorites unless explicitly disabled
|
||||
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
|
||||
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
|
||||
|
||||
if sync_favorites:
|
||||
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -11,13 +11,16 @@ __all__ = [
|
||||
'open_tidal_session'
|
||||
]
|
||||
|
||||
SPOTIFY_SCOPES = 'playlist-read-private, user-library-read'
|
||||
|
||||
def open_spotify_session(config) -> spotipy.Spotify:
|
||||
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
|
||||
scope='playlist-read-private',
|
||||
client_id=config['client_id'],
|
||||
client_secret=config['client_secret'],
|
||||
redirect_uri=config['redirect_uri'],
|
||||
requests_timeout=2)
|
||||
scope=SPOTIFY_SCOPES,
|
||||
client_id=config['client_id'],
|
||||
client_secret=config['client_secret'],
|
||||
redirect_uri=config['redirect_uri'],
|
||||
requests_timeout=2,
|
||||
open_browser=config.get('open_browser', True))
|
||||
try:
|
||||
credentials_manager.get_access_token(as_dict=False)
|
||||
except spotipy.SpotifyOauthError:
|
||||
|
||||
@@ -2,16 +2,19 @@
|
||||
|
||||
import asyncio
|
||||
from .cache import failure_cache, track_match_cache
|
||||
import datetime
|
||||
from difflib import SequenceMatcher
|
||||
from functools import partial
|
||||
from typing import List, Sequence, Set, Mapping
|
||||
from typing import Callable, List, Sequence, Set, Mapping
|
||||
import math
|
||||
import requests
|
||||
import sys
|
||||
import spotipy
|
||||
import tidalapi
|
||||
from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist
|
||||
from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites, get_all_playlists, get_all_playlist_tracks
|
||||
import time
|
||||
from tqdm.asyncio import tqdm as atqdm
|
||||
from tqdm import tqdm
|
||||
import traceback
|
||||
import unicodedata
|
||||
import math
|
||||
@@ -50,7 +53,7 @@ def name_match(tidal_track, spotify_track) -> bool:
|
||||
simple_spotify_track = simple(spotify_track['name'].lower()).split('feat.')[0].strip()
|
||||
return simple_spotify_track in tidal_track.name.lower() or normalize(simple_spotify_track) in normalize(tidal_track.name.lower())
|
||||
|
||||
def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
|
||||
def artist_match(tidal: tidalapi.Track | tidalapi.Album, spotify) -> bool:
|
||||
def split_artist_name(artist: str) -> Sequence[str]:
|
||||
if '&' in artist:
|
||||
return artist.split('&')
|
||||
@@ -59,9 +62,9 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
|
||||
else:
|
||||
return [artist]
|
||||
|
||||
def get_tidal_artists(tidal_track: tidalapi.Track, do_normalize=False) -> Set[str]:
|
||||
def get_tidal_artists(tidal: tidalapi.Track | tidalapi.Album, do_normalize=False) -> Set[str]:
|
||||
result: list[str] = []
|
||||
for artist in tidal_track.artists:
|
||||
for artist in tidal.artists:
|
||||
if do_normalize:
|
||||
artist_name = normalize(artist.name)
|
||||
else:
|
||||
@@ -69,9 +72,9 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
|
||||
result.extend(split_artist_name(artist_name))
|
||||
return set([simple(x.strip().lower()) for x in result])
|
||||
|
||||
def get_spotify_artists(spotify_track: t_spotify.SpotifyTrack, do_normalize=False) -> Set[str]:
|
||||
def get_spotify_artists(spotify, do_normalize=False) -> Set[str]:
|
||||
result: list[str] = []
|
||||
for artist in spotify_track['artists']:
|
||||
for artist in spotify['artists']:
|
||||
if do_normalize:
|
||||
artist_name = normalize(artist['name'])
|
||||
else:
|
||||
@@ -80,32 +83,42 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
|
||||
return set([simple(x.strip().lower()) for x in result])
|
||||
# There must be at least one overlapping artist between the Tidal and Spotify track
|
||||
# Try with both un-normalized and then normalized
|
||||
if get_tidal_artists(tidal_track).intersection(get_spotify_artists(spotify_track)) != set():
|
||||
if get_tidal_artists(tidal).intersection(get_spotify_artists(spotify)) != set():
|
||||
return True
|
||||
return get_tidal_artists(tidal_track, True).intersection(get_spotify_artists(spotify_track, True)) != set()
|
||||
return get_tidal_artists(tidal, True).intersection(get_spotify_artists(spotify, True)) != set()
|
||||
|
||||
def match(tidal_track, spotify_track) -> bool:
|
||||
if not spotify_track['id']: return False
|
||||
return isrc_match(tidal_track, spotify_track) or (
|
||||
duration_match(tidal_track, spotify_track)
|
||||
and name_match(tidal_track, spotify_track)
|
||||
and artist_match(tidal_track, spotify_track)
|
||||
)
|
||||
|
||||
def test_album_similarity(spotify_album, tidal_album, threshold=0.6):
|
||||
return SequenceMatcher(None, simple(spotify_album['name']), simple(tidal_album.name)).ratio() >= threshold and artist_match(tidal_album, spotify_album)
|
||||
|
||||
async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Session) -> tidalapi.Track | None:
|
||||
def _search_for_track_in_album():
|
||||
# search for album name and first album artist
|
||||
if 'album' in spotify_track and 'artists' in spotify_track['album'] and len(spotify_track['album']['artists']):
|
||||
album_result = tidal_session.search(simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name']), models=[tidalapi.album.Album])
|
||||
query = simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name'])
|
||||
album_result = tidal_session.search(query, models=[tidalapi.album.Album])
|
||||
for album in album_result['albums']:
|
||||
album_tracks = album.tracks()
|
||||
if len(album_tracks) >= spotify_track['track_number']:
|
||||
if album.num_tracks >= spotify_track['track_number'] and test_album_similarity(spotify_track['album'], album):
|
||||
album_tracks = album.tracks()
|
||||
if len(album_tracks) < spotify_track['track_number']:
|
||||
assert( not len(album_tracks) == album.num_tracks ) # incorrect metadata :(
|
||||
continue
|
||||
track = album_tracks[spotify_track['track_number'] - 1]
|
||||
if match(track, spotify_track):
|
||||
failure_cache.remove_match_failure(spotify_track['id'])
|
||||
return track
|
||||
|
||||
def _search_for_standalone_track():
|
||||
# if album search fails then search for track name and first artist
|
||||
for track in tidal_session.search(simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name']), models=[tidalapi.media.Track])['tracks']:
|
||||
query = simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name'])
|
||||
for track in tidal_session.search(query, models=[tidalapi.media.Track])['tracks']:
|
||||
if match(track, spotify_track):
|
||||
failure_cache.remove_match_failure(spotify_track['id'])
|
||||
return track
|
||||
@@ -117,25 +130,15 @@ async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Sess
|
||||
track_search = await asyncio.to_thread( _search_for_standalone_track )
|
||||
if track_search:
|
||||
return track_search
|
||||
return None
|
||||
|
||||
# if none of the search modes succeeded then store the track id to the failure cache
|
||||
failure_cache.cache_match_failure(spotify_track['id'])
|
||||
|
||||
def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
|
||||
# a dictionary of name --> playlist
|
||||
print("Loading Tidal playlists... This may take some time.")
|
||||
tidal_playlists = tidal_session.user.playlists()
|
||||
output = {}
|
||||
for playlist in tidal_playlists:
|
||||
output[playlist.name] = playlist
|
||||
return output
|
||||
|
||||
async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
|
||||
# utility to repeat calling the function up to 5 times if an exception is thrown
|
||||
try:
|
||||
return await function(*args, **kwargs)
|
||||
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException) as e:
|
||||
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e:
|
||||
if remaining:
|
||||
print(f"{str(e)} occurred, retrying {remaining} times")
|
||||
else:
|
||||
@@ -154,36 +157,52 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
|
||||
time.sleep(sleep_schedule.get(remaining, 1))
|
||||
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
|
||||
|
||||
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
|
||||
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
|
||||
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
|
||||
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
|
||||
|
||||
async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
|
||||
output = []
|
||||
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
|
||||
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] )
|
||||
output.extend([r['track'] for r in results['items'] if r['track'] is not None])
|
||||
results = fetch_function(0)
|
||||
output.extend([item['track'] for item in results['items'] if item['track'] is not None])
|
||||
|
||||
# get all the remaining tracks in parallel
|
||||
# Get all the remaining tracks in parallel
|
||||
if results['next']:
|
||||
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
|
||||
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] )
|
||||
offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
|
||||
extra_results = await atqdm.gather(
|
||||
*[asyncio.to_thread(fetch_function, offset) for offset in offsets],
|
||||
desc="Fetching additional data chunks"
|
||||
)
|
||||
for extra_result in extra_results:
|
||||
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None])
|
||||
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])
|
||||
|
||||
return output
|
||||
|
||||
|
||||
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
|
||||
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
|
||||
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type"
|
||||
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
|
||||
|
||||
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
|
||||
items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
|
||||
track_filter = lambda item: item.get('type', 'track') == 'track' # type may be 'episode' also
|
||||
sanity_filter = lambda item: ('album' in item
|
||||
and 'name' in item['album']
|
||||
and 'artists' in item['album']
|
||||
and len(item['album']['artists']) > 0
|
||||
and item['album']['artists'][0]['name'] is not None)
|
||||
return list(filter(sanity_filter, filter(track_filter, items)))
|
||||
|
||||
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
|
||||
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
|
||||
def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack):
|
||||
for idx, tidal_track in list(enumerate(tidal_tracks)):
|
||||
if match(tidal_track, spotify_track):
|
||||
if tidal_track.available and match(tidal_track, spotify_track):
|
||||
track_match_cache.insert((spotify_track['id'], tidal_track.id))
|
||||
tidal_tracks.pop(idx)
|
||||
return
|
||||
|
||||
def _populate_one_track_from_tidal(tidal_track: tidalapi.Track):
|
||||
for idx, spotify_track in list(enumerate(spotify_tracks)):
|
||||
if match(tidal_track, spotify_track):
|
||||
if tidal_track.available and match(tidal_track, spotify_track):
|
||||
track_match_cache.insert((spotify_track['id'], tidal_track.id))
|
||||
spotify_tracks.pop(idx)
|
||||
return
|
||||
@@ -199,9 +218,8 @@ def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack]
|
||||
for track in spotify_tracks:
|
||||
_populate_one_track_from_spotify(track)
|
||||
|
||||
def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]:
|
||||
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
|
||||
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
|
||||
def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]:
|
||||
''' Extracts only the tracks that have not already been seen in our Tidal caches '''
|
||||
results = []
|
||||
for spotify_track in spotify_tracks:
|
||||
if not spotify_track['id']: continue
|
||||
@@ -213,51 +231,81 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
|
||||
''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates '''
|
||||
output = []
|
||||
seen_tracks = set()
|
||||
|
||||
for spotify_track in spotify_tracks:
|
||||
if not spotify_track['id']: continue
|
||||
tidal_id = track_match_cache.get(spotify_track['id'])
|
||||
if tidal_id and not tidal_id in seen_tracks:
|
||||
output.append(tidal_id)
|
||||
seen_tracks.add(tidal_id)
|
||||
if tidal_id:
|
||||
if tidal_id in seen_tracks:
|
||||
track_name = spotify_track['name']
|
||||
artist_names = ', '.join([artist['name'] for artist in spotify_track['artists']])
|
||||
print(f'Duplicate found: Track "{track_name}" by {artist_names} will be ignored')
|
||||
else:
|
||||
output.append(tidal_id)
|
||||
seen_tracks.add(tidal_id)
|
||||
return output
|
||||
|
||||
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
|
||||
async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict):
|
||||
""" Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache """
|
||||
async def _run_rate_limiter(semaphore):
|
||||
''' Leaky bucket algorithm for rate limiting. Periodically releases an item from semaphore at rate_limit'''
|
||||
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
|
||||
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
|
||||
t0 = datetime.datetime.now()
|
||||
while True:
|
||||
await asyncio.sleep(1/config.get('rate_limit', 12)) # sleep for min time between new function executions
|
||||
semaphore.release() # leak one item from the 'bucket'
|
||||
await asyncio.sleep(_sleep_time)
|
||||
t = datetime.datetime.now()
|
||||
dt = (t - t0).total_seconds()
|
||||
new_items = round(config.get('rate_limit', 10)*dt)
|
||||
t0 = t
|
||||
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'
|
||||
|
||||
# Create a new Tidal playlist if required
|
||||
if not tidal_playlist:
|
||||
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
|
||||
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
|
||||
|
||||
# Extract the new tracks from the playlist that we haven't already seen before
|
||||
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
|
||||
old_tidal_tracks = tidal_playlist.tracks()
|
||||
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
|
||||
# Extract the new tracks that do not already exist in the old tidal tracklist
|
||||
tracks_to_search = get_new_spotify_tracks(spotify_tracks)
|
||||
if not tracks_to_search:
|
||||
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
|
||||
return
|
||||
|
||||
# Search for each of the tracks on Tidal concurrently
|
||||
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name'])
|
||||
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name)
|
||||
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
|
||||
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
|
||||
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
|
||||
rate_limiter_task.cancel()
|
||||
|
||||
# Add the search results to the cache
|
||||
song404 = []
|
||||
for idx, spotify_track in enumerate(tracks_to_search):
|
||||
if search_results[idx]:
|
||||
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
|
||||
else:
|
||||
song404.append(f"{spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}")
|
||||
color = ('\033[91m', '\033[0m')
|
||||
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1])
|
||||
print(color[0] + "Could not find the track " + song404[-1] + color[1])
|
||||
file_name = "songs not found.txt"
|
||||
with open(file_name, "a", encoding="utf-8") as file:
|
||||
for song in song404:
|
||||
file.write(f"{song}\n")
|
||||
|
||||
|
||||
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict):
|
||||
""" sync given playlist to tidal """
|
||||
# Get the tracks from both Spotify and Tidal, creating a new Tidal playlist if necessary
|
||||
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
|
||||
if len(spotify_tracks) == 0:
|
||||
return # nothing to do
|
||||
if tidal_playlist:
|
||||
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
|
||||
else:
|
||||
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
|
||||
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
|
||||
old_tidal_tracks = []
|
||||
|
||||
# Extract the new tracks from the playlist that we haven't already seen before
|
||||
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
|
||||
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
|
||||
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
|
||||
|
||||
# Update the Tidal playlist if there are changes
|
||||
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
|
||||
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
|
||||
if new_tidal_track_ids == old_tidal_track_ids:
|
||||
print("No changes to write to Tidal playlist")
|
||||
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
|
||||
@@ -265,13 +313,51 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
|
||||
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
|
||||
else:
|
||||
# Erase old playlist and add new tracks from scratch if any reordering occured
|
||||
set_tidal_playlist(tidal_playlist, new_tidal_track_ids)
|
||||
clear_tidal_playlist(tidal_playlist)
|
||||
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids)
|
||||
|
||||
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
|
||||
async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict):
|
||||
""" sync user favorites to tidal """
|
||||
async def get_tracks_from_spotify_favorites() -> List[dict]:
|
||||
_get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks(offset=offset)
|
||||
tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks)
|
||||
tracks.reverse()
|
||||
return tracks
|
||||
|
||||
def get_new_tidal_favorites() -> List[int]:
|
||||
existing_favorite_ids = set([track.id for track in old_tidal_tracks])
|
||||
new_ids = []
|
||||
for spotify_track in spotify_tracks:
|
||||
match_id = track_match_cache.get(spotify_track['id'])
|
||||
if match_id and not match_id in existing_favorite_ids:
|
||||
new_ids.append(match_id)
|
||||
return new_ids
|
||||
|
||||
print("Loading favorite tracks from Spotify")
|
||||
spotify_tracks = await get_tracks_from_spotify_favorites()
|
||||
print("Loading existing favorite tracks from Tidal")
|
||||
old_tidal_tracks = await get_all_favorites(tidal_session.user.favorites, order='DATE')
|
||||
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
|
||||
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config)
|
||||
new_tidal_favorite_ids = get_new_tidal_favorites()
|
||||
if new_tidal_favorite_ids:
|
||||
for tidal_id in tqdm(new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites"):
|
||||
tidal_session.user.favorites.add_track(tidal_id)
|
||||
else:
|
||||
print("No new tracks to add to Tidal favorites")
|
||||
|
||||
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict):
|
||||
for spotify_playlist, tidal_playlist in playlists:
|
||||
# sync the spotify playlist to tidal
|
||||
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
|
||||
|
||||
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
|
||||
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
|
||||
|
||||
def get_tidal_playlists_wrapper(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
|
||||
tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user))
|
||||
return {playlist.name: playlist for playlist in tidal_playlists}
|
||||
|
||||
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
|
||||
if spotify_playlist['name'] in tidal_playlists:
|
||||
# if there's an existing tidal playlist with the name of the current playlist then use that
|
||||
@@ -283,39 +369,45 @@ def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists:
|
||||
def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
|
||||
results = []
|
||||
spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config))
|
||||
tidal_playlists = get_tidal_playlists_dict(tidal_session)
|
||||
tidal_playlists = get_tidal_playlists_wrapper(tidal_session)
|
||||
for spotify_playlist in spotify_playlists:
|
||||
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
|
||||
return results
|
||||
|
||||
async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
|
||||
# get all the user playlists from the Spotify account
|
||||
# get all the playlists from the Spotify account
|
||||
playlists = []
|
||||
print("Loading Spotify playlists")
|
||||
results = spotify_session.user_playlists(config['spotify']['username'])
|
||||
first_results = spotify_session.current_user_playlists()
|
||||
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
|
||||
|
||||
playlists.extend([p for p in first_results['items']])
|
||||
user_id = spotify_session.current_user()['id']
|
||||
|
||||
# get all the remaining playlists in parallel
|
||||
if results['next']:
|
||||
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
|
||||
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.user_playlists, config['spotify']['username'], offset=offset) for offset in offsets ] )
|
||||
if first_results['next']:
|
||||
offsets = [ first_results['limit'] * n for n in range(1, math.ceil(first_results['total']/first_results['limit'])) ]
|
||||
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) for offset in offsets ] )
|
||||
for extra_result in extra_results:
|
||||
playlists.extend([p for p in extra_result['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
|
||||
return playlists
|
||||
playlists.extend([p for p in extra_result['items']])
|
||||
|
||||
# filter out playlists that don't belong to us or are on the exclude list
|
||||
my_playlist_filter = lambda p: p and p['owner']['id'] == user_id
|
||||
exclude_filter = lambda p: not p['id'] in exclude_list
|
||||
return list(filter( exclude_filter, filter( my_playlist_filter, playlists )))
|
||||
|
||||
def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
|
||||
# get the list of playlist sync mappings from the configuration file
|
||||
def get_playlist_ids(config):
|
||||
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
|
||||
output = []
|
||||
for spotify_id, tidal_id in get_playlist_ids(config):
|
||||
for spotify_id, tidal_id in get_playlist_ids(config=config):
|
||||
try:
|
||||
spotify_playlist = spotify_session.playlist(spotify_id)
|
||||
spotify_playlist = spotify_session.playlist(playlist_id=spotify_id)
|
||||
except spotipy.SpotifyException as e:
|
||||
print(f"Error getting Spotify playlist {spotify_id}")
|
||||
raise e
|
||||
try:
|
||||
tidal_playlist = tidal_session.playlist(tidal_id)
|
||||
tidal_playlist = tidal_session.playlist(playlist_id=tidal_id)
|
||||
except Exception as e:
|
||||
print(f"Error getting Tidal playlist {tidal_id}")
|
||||
raise e
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import asyncio
|
||||
import math
|
||||
from typing import List
|
||||
import tidalapi
|
||||
from tqdm import tqdm
|
||||
from tqdm.asyncio import tqdm as atqdm
|
||||
|
||||
def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]):
|
||||
headers = {'If-None-Match': playlist._etag}
|
||||
@@ -24,6 +27,53 @@ def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids:
|
||||
offset += count
|
||||
progress.update(count)
|
||||
|
||||
def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]):
|
||||
clear_tidal_playlist(playlist)
|
||||
add_multiple_tracks_to_playlist(playlist, track_ids)
|
||||
async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track]:
|
||||
"""
|
||||
Helper function to get all items from a Tidal endpoint in parallel
|
||||
The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead
|
||||
"""
|
||||
def _make_request(offset: int=0):
|
||||
new_params = params
|
||||
new_params['offset'] = offset
|
||||
return session.request.map_request(url, params=new_params)
|
||||
|
||||
first_chunk_raw = _make_request()
|
||||
limit = first_chunk_raw['limit']
|
||||
total = first_chunk_raw['totalNumberOfItems']
|
||||
items = session.request.map_json(first_chunk_raw, parse=parser)
|
||||
|
||||
if len(items) < total:
|
||||
offsets = [limit * n for n in range(1, math.ceil(total/limit))]
|
||||
extra_results = await atqdm.gather(
|
||||
*[asyncio.to_thread(lambda offset: session.request.map_json(_make_request(offset), parse=parser), offset) for offset in offsets],
|
||||
desc="Fetching additional data chunks"
|
||||
)
|
||||
for extra_result in extra_results:
|
||||
items.extend(extra_result)
|
||||
return items
|
||||
|
||||
async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC", chunk_size: int=100) -> List[tidalapi.Track]:
|
||||
""" Get all favorites from Tidal playlist in chunks """
|
||||
params = {
|
||||
"limit": chunk_size,
|
||||
"order": order,
|
||||
"orderDirection": order_direction,
|
||||
}
|
||||
return await _get_all_chunks(f"{favorites.base_url}/tracks", session=favorites.session, parser=favorites.session.parse_track, params=params)
|
||||
|
||||
async def get_all_playlists(user: tidalapi.User, chunk_size: int=10) -> List[tidalapi.Playlist]:
|
||||
""" Get all user playlists from Tidal in chunks """
|
||||
print(f"Loading playlists from Tidal user")
|
||||
params = {
|
||||
"limit": chunk_size,
|
||||
}
|
||||
return await _get_all_chunks(f"users/{user.id}/playlists", session=user.session, parser=user.playlist.parse_factory, params=params)
|
||||
|
||||
async def get_all_playlist_tracks(playlist: tidalapi.Playlist, chunk_size: int=20) -> List[tidalapi.Track]:
|
||||
""" Get all tracks from Tidal playlist in chunks """
|
||||
params = {
|
||||
"limit": chunk_size,
|
||||
}
|
||||
print(f"Loading tracks from Tidal playlist '{playlist.name}'")
|
||||
return await _get_all_chunks(f"{playlist._base_url%playlist.id}/tracks", session=playlist.session, parser=playlist.session.parse_track, params=params)
|
||||
|
||||
|
||||
5
tests/conftest.py
Normal file
5
tests/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the src directory to the Python path
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))
|
||||
0
tests/unit/__init__.py
Normal file
0
tests/unit/__init__.py
Normal file
75
tests/unit/test_auth.py
Normal file
75
tests/unit/test_auth.py
Normal file
@@ -0,0 +1,75 @@
|
||||
# tests/unit/test_auth.py
|
||||
|
||||
import pytest
|
||||
import spotipy
|
||||
import tidalapi
|
||||
import yaml
|
||||
import sys
|
||||
from unittest import mock
|
||||
from spotify_to_tidal.auth import open_spotify_session, open_tidal_session, SPOTIFY_SCOPES
|
||||
|
||||
|
||||
def test_open_spotify_session(mocker):
|
||||
# Mock the SpotifyOAuth class
|
||||
mock_spotify_oauth = mocker.patch(
|
||||
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
|
||||
)
|
||||
mock_spotify_instance = mocker.patch(
|
||||
"spotify_to_tidal.auth.spotipy.Spotify", autospec=True
|
||||
)
|
||||
|
||||
# Define a mock configuration
|
||||
mock_config = {
|
||||
"username": "test_user",
|
||||
"client_id": "test_client_id",
|
||||
"client_secret": "test_client_secret",
|
||||
"redirect_uri": "http://127.0.0.1/",
|
||||
"open_browser": True,
|
||||
}
|
||||
|
||||
# Create a mock SpotifyOAuth instance
|
||||
mock_oauth_instance = mock_spotify_oauth.return_value
|
||||
mock_oauth_instance.get_access_token.return_value = "mock_access_token"
|
||||
|
||||
# Call the function under test
|
||||
spotify_instance = open_spotify_session(mock_config)
|
||||
|
||||
# Assert that the SpotifyOAuth was called with correct parameters
|
||||
mock_spotify_oauth.assert_called_once_with(
|
||||
username="test_user",
|
||||
scope=SPOTIFY_SCOPES,
|
||||
client_id="test_client_id",
|
||||
client_secret="test_client_secret",
|
||||
redirect_uri="http://127.0.0.1/",
|
||||
requests_timeout=2,
|
||||
open_browser=True,
|
||||
)
|
||||
|
||||
# Assert that the Spotify instance was created
|
||||
mock_spotify_instance.assert_called_once_with(oauth_manager=mock_oauth_instance)
|
||||
assert spotify_instance == mock_spotify_instance.return_value
|
||||
|
||||
|
||||
def test_open_spotify_session_oauth_error(mocker):
|
||||
# Mock the SpotifyOAuth class and simulate an OAuth error
|
||||
mock_spotify_oauth = mocker.patch(
|
||||
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
|
||||
)
|
||||
mock_spotify_oauth.return_value.get_access_token.side_effect = (
|
||||
spotipy.SpotifyOauthError("mock error")
|
||||
)
|
||||
|
||||
# Define a mock configuration
|
||||
mock_config = {
|
||||
"username": "test_user",
|
||||
"client_id": "test_client_id",
|
||||
"client_secret": "test_client_secret",
|
||||
"redirect_uri": "http://127.0.0.1/",
|
||||
}
|
||||
|
||||
# Mock sys.exit to prevent the test from exiting
|
||||
mock_sys_exit = mocker.patch("sys.exit")
|
||||
|
||||
# Call the function under test and assert sys.exit is called
|
||||
open_spotify_session(mock_config)
|
||||
mock_sys_exit.assert_called_once()
|
||||
80
tests/unit/test_cache.py
Normal file
80
tests/unit/test_cache.py
Normal file
@@ -0,0 +1,80 @@
|
||||
# tests/unit/test_cache.py
|
||||
|
||||
import pytest
|
||||
import datetime
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine, select
|
||||
from unittest import mock
|
||||
from spotify_to_tidal.cache import MatchFailureDatabase, TrackMatchCache
|
||||
|
||||
|
||||
# Setup an in-memory SQLite database for testing
|
||||
@pytest.fixture
|
||||
def in_memory_db():
|
||||
engine = create_engine("sqlite:///:memory:")
|
||||
return engine
|
||||
|
||||
|
||||
# Test MatchFailureDatabase
|
||||
def test_cache_match_failure(in_memory_db, mocker):
|
||||
mocker.patch(
|
||||
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
|
||||
)
|
||||
failure_db = MatchFailureDatabase()
|
||||
|
||||
track_id = "test_track"
|
||||
failure_db.cache_match_failure(track_id)
|
||||
|
||||
with failure_db.engine.connect() as connection:
|
||||
result = connection.execute(
|
||||
select(failure_db.match_failures).where(
|
||||
failure_db.match_failures.c.track_id == track_id
|
||||
)
|
||||
).fetchone()
|
||||
assert result is not None
|
||||
assert result.track_id == track_id
|
||||
|
||||
|
||||
def test_has_match_failure(in_memory_db, mocker):
|
||||
mocker.patch(
|
||||
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
|
||||
)
|
||||
failure_db = MatchFailureDatabase()
|
||||
|
||||
track_id = "test_track"
|
||||
failure_db.cache_match_failure(track_id)
|
||||
|
||||
assert failure_db.has_match_failure(track_id) is True
|
||||
|
||||
|
||||
def test_remove_match_failure(in_memory_db, mocker):
|
||||
mocker.patch(
|
||||
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
|
||||
)
|
||||
failure_db = MatchFailureDatabase()
|
||||
|
||||
track_id = "test_track"
|
||||
failure_db.cache_match_failure(track_id)
|
||||
failure_db.remove_match_failure(track_id)
|
||||
|
||||
with failure_db.engine.connect() as connection:
|
||||
result = connection.execute(
|
||||
select(failure_db.match_failures).where(
|
||||
failure_db.match_failures.c.track_id == track_id
|
||||
)
|
||||
).fetchone()
|
||||
assert result is None
|
||||
|
||||
|
||||
# Test TrackMatchCache
|
||||
def test_track_match_cache_insert():
|
||||
track_cache = TrackMatchCache()
|
||||
track_cache.insert(("spotify_id", 123))
|
||||
assert track_cache.get("spotify_id") == 123
|
||||
|
||||
|
||||
def test_track_match_cache_get():
|
||||
track_cache = TrackMatchCache()
|
||||
track_cache.insert(("spotify_id", 123))
|
||||
assert track_cache.get("spotify_id") == 123
|
||||
assert track_cache.get("nonexistent_id") is None
|
||||
Reference in New Issue
Block a user