46 Commits
0.1.0 ... main

Author SHA1 Message Date
2f762eed4b Standalone App; Works fine but only for MY OWN PLAYLISTS due to spotify dev rights :-( 2026-03-15 20:35:46 +01:00
Tim Rae
8a66f5f864 Fix bug in pyproject.toml: use tilde operator 2026-01-11 11:24:32 +01:00
Tim Rae
299cc3209f Don't specify point version of spotify and tidal APIs
So we don't have to put out a new release to update the dependency
2025-12-31 15:01:30 +01:00
Tim Rae
46771aefc8 Bump version to 1.0.6 2025-12-31 14:59:03 +01:00
Tim Rae
eb745eab6f Update tidalapi version 2025-12-31 14:54:44 +01:00
Alex Camilleri
9ff181ae6e Playlist name in txt output (#140) 2025-10-20 09:36:04 +02:00
Tim Rae
d90c5bac41 Update tidalapi dependency version to 0.8.8 (#136)
Fixes #135 #134
2025-10-16 12:22:15 +02:00
Axel
e62a8a80cb use 127.0.0.1 instead of localhost to meet new spotify URI criteria (#116) 2025-04-09 22:14:15 +02:00
Darkasf
a5afd975f0 Update readme.md (#113) 2025-03-30 06:51:13 +02:00
Johan Lindbergh
03e0396ac0 Add another check to the spotify track sanity filter (#111)
Discovered a crash when syncing a playlist with some random podcast
episode. It crashed because both ['artists'][0]['name'] and
['album']['artists'][0]['name'] was None.

I thought the episode would be caught by the track_filter, but
apparently having the 'type': 'episode' isn't reliable enough :/

Not sure if the check should be in the sanity_filter or any of
the _search_for... methods though.
2025-02-13 22:50:00 +01:00
Tim Rae
a438cda72b Bump version to 1.0.4 2024-12-04 06:09:12 +09:00
Tim Rae
2f1985a42b Check for null playlists in filter 2024-12-04 05:46:30 +09:00
Tim Rae
bcf2bbca0d Add option to do auth manually on headless server (#88) 2024-10-26 11:12:47 +02:00
Tim Rae
457da1724f Bump version to 1.0.3 2024-10-14 01:25:58 +02:00
Tim Rae
4d7c3b0ef0 Filter out tracks which don't have valid album metadata
Apply a final sanity filter to tracklist to validate assumption in matching
algorithm that the album has certain fields available.

In most situations this filter is not necessary, but occasionally we do
seem to encounter tracks that have no album metadata
2024-10-14 01:25:48 +02:00
Tim Rae
4fb702d008 Filter out podcast episodes (#83)
* Filter out podcast episodes from playlists
* Don't create empty playlists
* Bump version 1.0.2

Fixes #30
2024-09-28 08:34:58 +02:00
Tim Rae
693dcd110f Update version to 1.0.1 2024-09-12 16:59:45 +02:00
Tim Rae
17d60c019d Use user id from spotify session instead of config file
Fixes #80
2024-09-11 19:18:13 +02:00
mannp
7148053ad4 Update pyproject.toml (#79) 2024-09-09 19:03:56 +02:00
Tim Rae
64ea6f3df0 Bump version to 1.0.0
There have been some pretty significant changes recently
so figured it's time for a major version bump
2024-09-09 12:28:32 +02:00
Tim Rae
da1bc2eff9 Use utf-8 encoding for tracks not found 2024-09-09 10:02:22 +02:00
Tim Rae
5e3645319f Use current_user_playlists instead of user_playlists 2024-08-19 07:57:35 +02:00
Tim Rae
af0fd7a2cf Add retry functionality for playlist tracks 2024-08-19 07:57:35 +02:00
som
4f023d024b creates file for songs not found (#74) 2024-08-15 09:58:45 +02:00
Arnav Banerjee
54bb7097a7 fixed failing test (#68) 2024-06-11 20:32:31 +02:00
xerexoded
692939cd0e added circle CI config with basic unit tests 2024-06-10 19:29:13 +02:00
Tim Rae
ced8db44fe Fetch Tidal playlists/tracks asynchronously in chunks 2024-06-10 10:00:57 +02:00
Tim Rae
193e74b775 Never erase tracks from Tidal favorites 2024-06-10 09:32:44 +02:00
Tim Rae
7a1343ac91 Add config param for sync_favorites_default and cleanup command line arg 2024-06-10 09:32:44 +02:00
Tim Rae
6d6a4fe43e Fix favorite tracks getting cutoff at 1000 2024-06-10 09:32:44 +02:00
Tim Rae
1cdc62fd0c Cleanup partitioning of code used in favorites / playlist list 2024-06-10 09:32:44 +02:00
Cobal
54526a0306 feat: Sync favorites tracks to Tidal 2024-06-10 09:32:44 +02:00
Tim Rae
fab154851b Don't cache Tidal tracks that are unavailable
These would generally fail to be added to the Tidal playlist and could
cause playlists to be unnecessarily wiped every time the playlist is
synced
2024-06-09 12:28:52 +02:00
Raphael
b6340790ca Print duplicates that occurred while syncing (#64) 2024-06-09 10:50:10 +02:00
Tim Rae
8692624a8c Bump version to 0.1.4 2024-06-08 20:50:25 +02:00
Tim Rae
dc959f2657 Fix failure cache not being populated 2024-06-08 20:48:26 +02:00
Tim Rae
d9312d89dd Handle case where album contains incorrect metadata 2024-06-08 17:56:06 +02:00
Tim Rae
1a7ff4f083 Performance optimisation: Don't query tracks for uninteresting albums
The call to tidal_album.tracks() can take a very long time to execute
which can significantly slow down the search in certain cases
2024-06-08 16:41:15 +02:00
Tim Rae
0c859cc9aa Bump version to 0.1.3 2024-06-08 12:55:09 +02:00
Tim Rae
bb0f3cffd0 Fix rate limit accuracy
Batching multiple updates to the leaky bucket at a fixed interval
improves the accuracy of the rate limiter. Previously the rate would
drop substantially over the course of the sync operation.
2024-06-08 12:54:27 +02:00
Adria Jimenez
ecc642ba7d fix: sync first playlists page
This commit: 1e8366a0e8 broke the loading of playlists. The first results coming back from the Spotify API were not being added to the playlists array.
2024-06-06 13:33:40 +02:00
Tim Rae
3e9b2ef0ec Update readme.md 2024-06-05 09:22:24 +02:00
Tim Rae
a16f764bee Bump version to 0.1.2 2024-06-03 23:38:21 +02:00
Tim Rae
c1956d19cc Fix bug where occasionally wrong track is inserted 2024-06-03 23:38:21 +02:00
Tim Rae
faaf103d23 Bump version to 0.1.1 2024-06-03 22:32:28 +02:00
Tim Rae
a2e62ea20d Fix error due to missing type 2024-06-03 22:32:28 +02:00
18 changed files with 1045 additions and 94 deletions

38
.circleci/config.yml Normal file
View File

@@ -0,0 +1,38 @@
version: 2.1
jobs:
setup:
docker:
- image: circleci/python:3.10
steps:
- checkout
- run:
name: Install dependencies
command: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-mock
test:
docker:
- image: circleci/python:3.10
steps:
- checkout
- run:
name: Install dependencies
command: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-mock
- run:
name: Run tests
command: pytest
workflows:
version: 2
test_workflow:
jobs:
- setup
- test:
requires:
- setup

4
.gitignore vendored
View File

@@ -3,6 +3,7 @@ config.yml
config.yaml
.cache*
.session.yml
songs not found.txt
# Byte-compiled / optimized / DLL files
__pycache__/
@@ -30,3 +31,6 @@ venv/
.mypy_cache/
.dmypy.json
dmypy.json
# Visual Studio Code
.vscode/

11
Launch Spotify→Tidal.command Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/zsh
# Double-click this file in Finder to launch the Spotify→Tidal sync app.
# Change to the project directory (same folder as this script)
cd "$(dirname "$0")"
# Activate the virtual environment
source .venv/bin/activate
# Launch Streamlit (opens browser automatically)
streamlit run app.py

417
app.py Normal file
View File

@@ -0,0 +1,417 @@
import json
import math
import yaml
import asyncio
import pandas as pd
import streamlit as st
from pathlib import Path
import spotipy
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
# ── Constants ─────────────────────────────────────────────────────────────────
SESSION_FILE = Path("session.json")
NOT_FOUND_FILE = Path("songs not found.txt")
# ── Session persistence ───────────────────────────────────────────────────────
def load_session() -> dict:
if SESSION_FILE.exists():
try:
return json.loads(SESSION_FILE.read_text())
except Exception:
pass
return {"watched": [], "external_playlists": []}
def save_session(data: dict):
SESSION_FILE.write_text(json.dumps(data, indent=2))
def load_watched() -> set:
return set(load_session().get("watched", []))
def save_watched(ids: set):
data = load_session()
data["watched"] = sorted(ids)
save_session(data)
def load_external_playlists() -> list:
return load_session().get("external_playlists", [])
def upsert_external_playlist(info: dict):
data = load_session()
ext = [p for p in data.get("external_playlists", []) if p.get("id") != info["id"]]
ext.append(info)
data["external_playlists"] = ext
save_session(data)
def remove_external_playlist(pid: str):
data = load_session()
data["external_playlists"] = [p for p in data.get("external_playlists", []) if p.get("id") != pid]
save_session(data)
# ── Not-found parser ──────────────────────────────────────────────────────────
def parse_not_found() -> dict:
"""Returns {playlist_name: [(artist, title), ...]}"""
result = {}
if not NOT_FOUND_FILE.exists():
return result
current = None
for raw in NOT_FOUND_FILE.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("=="):
continue
if line.startswith("Playlist:"):
current = line[len("Playlist:"):].strip()
result.setdefault(current, [])
elif current is not None and ": " in line:
_, rest = line.split(": ", 1)
artist, title = (rest.split(" - ", 1) if " - " in rest else ("", rest))
result[current].append((artist.strip(), title.strip()))
return result
# ── Page config ───────────────────────────────────────────────────────────────
st.set_page_config(page_title="Spotify → Tidal Sync", layout="wide")
st.title("🎵 Spotify → Tidal Sync")
# ── Auth ──────────────────────────────────────────────────────────────────────
@st.cache_resource
def get_sessions():
try:
with open("config.yml", "r") as f:
config = yaml.safe_load(f)
except Exception as e:
st.error(f"Failed to load config.yml: {e}"); st.stop()
try:
sp = _auth.open_spotify_session(config["spotify"])
td = _auth.open_tidal_session()
if not td.check_login():
st.error("Could not connect to Tidal."); st.stop()
return config, sp, td
except Exception as e:
st.error(f"Authentication failed: {e}"); st.stop()
config, spotify_session, tidal_session = get_sessions()
# ── Data fetchers ─────────────────────────────────────────────────────────────
@st.cache_data
def fetch_my_playlists():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_sync.get_playlists_from_spotify(spotify_session, config))
loop.close()
return result
@st.cache_data(show_spinner=False)
def fetch_playlist_tracks(pid: str, name: str) -> tuple:
"""Returns (tracks: list, error: str|None). Uses direct API — no sys.exit retry wrapper."""
try:
all_tracks = []
result = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=0)
if result is None:
return [], "Empty response from Spotify."
def _extract(r):
return [
item["track"] for item in (r.get("items") or [])
if item.get("track") and item["track"].get("id")
]
all_tracks.extend(_extract(result))
total = result.get("total", 0)
if result.get("next") and total > 50:
for offset in range(50, total, 50):
page = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=offset)
if page:
all_tracks.extend(_extract(page))
return all_tracks, None
except spotipy.SpotifyException as e:
if e.http_status == 403:
return [], "🔒 Access denied (403) — this playlist may be private or collaborative."
return [], f"Spotify error {e.http_status}: {e.msg}"
except Exception as e:
return [], f"Error loading tracks: {e}"
@st.cache_data(show_spinner=False)
def fetch_single_playlist(pid: str) -> dict | None:
try:
return spotify_session.playlist(pid, fields="id,name,description,tracks.total,owner")
except Exception as e:
print(f"[ERROR] fetch_single_playlist: {e}")
return None
# ── Sync helper ───────────────────────────────────────────────────────────────
def run_sync_playlists(playlists: list):
async def _do():
tidal_map = {p.name: p for p in await get_all_playlists(tidal_session.user)}
total = len(playlists)
bar = st.progress(0, text=f"Starting sync of {total} playlist(s)…")
skipped = []
for i, sp in enumerate(playlists):
bar.progress(i / total, text=f"[{i+1}/{total}] {sp['name']}")
try:
await _sync.sync_playlist(spotify_session, tidal_session, sp, tidal_map.get(sp["name"]), config)
except RuntimeError as e:
skipped.append((sp["name"], str(e)))
print(f"[SKIP] {sp['name']}: {e}")
bar.progress(1.0, text="✅ Done!")
return skipped
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
skipped = loop.run_until_complete(_do())
loop.close()
return skipped or []
# ── Load state ────────────────────────────────────────────────────────────────
with st.spinner("Fetching your Spotify playlists…"):
spotify_playlists = fetch_my_playlists()
if not spotify_playlists:
st.warning("No Spotify playlists found!"); st.stop()
watched_ids = load_watched()
not_found = parse_not_found()
total_nf = sum(len(v) for v in not_found.values())
# ── Tabs ──────────────────────────────────────────────────────────────────────
tab_my, tab_ext, tab_nf = st.tabs([
f"🎵 My Playlists ({len(spotify_playlists)})",
"🔗 External Playlists",
f"❌ Not Found ({total_nf} tracks)",
])
# ════════════════════════════════════════════════════════
# TAB 1 MY PLAYLISTS (expandable rows)
# ════════════════════════════════════════════════════════
with tab_my:
# Collect playlists selected for sync via session_state checkboxes
sync_btn_col, wtch_btn_col, _ = st.columns([2, 2, 4])
# We need to render the list first so state keys exist, then read them
# ── Column headers ────────────────────────────────
hdr = st.columns([6, 1, 1, 1])
hdr[0].markdown("**Playlist**")
hdr[1].markdown("**Tracks**")
hdr[2].markdown("**Watch**")
hdr[3].markdown("**Sync**")
st.divider()
current_watched = load_watched() # fresh from disk
# Sort: watched first, then rest
sorted_playlists = (
[p for p in spotify_playlists if p["id"] in current_watched] +
[p for p in spotify_playlists if p["id"] not in current_watched]
)
showed_watched_header = False
showed_rest_header = False
for p in sorted_playlists:
pid = p["id"]
name = p["name"]
nf_list = not_found.get(name, [])
is_watched = pid in current_watched
tracks_key = f"tracks_{pid}"
# Section headers
if is_watched and not showed_watched_header:
st.markdown("### 👁 Watched Playlists")
showed_watched_header = True
elif not is_watched and not showed_rest_header:
if showed_watched_header:
st.divider()
st.markdown("### 🎵 All Playlists")
showed_rest_header = True
# Use cached tracks from session_state if already loaded
cached_tracks: list | None = st.session_state.get(tracks_key)
track_count = len(cached_tracks) if cached_tracks is not None else None
# Build expander label
count_str = str(track_count) if track_count is not None else ""
prefix = "" if is_watched else ""
warn = " ⚠️" if nf_list else ""
label = f"{prefix}{name}{count_str} tracks{warn}"
with st.expander(label, expanded=False):
left, right = st.columns([3, 1])
with right:
# Watch toggle
new_watch = st.checkbox("👁 Watch", value=is_watched, key=f"watch_{pid}")
if new_watch != is_watched:
if new_watch:
current_watched.add(pid)
else:
current_watched.discard(pid)
save_watched(current_watched)
st.rerun()
# Sync checkbox (persists in session_state automatically)
st.checkbox("▶ Add to sync", key=f"sync_{pid}")
with left:
if cached_tracks is None:
if st.button("🔍 Load tracks", key=f"load_{pid}"):
with st.spinner(f"Loading '{name}'"):
tracks, err = fetch_playlist_tracks(pid, name)
st.session_state[tracks_key] = tracks
if err:
st.session_state[f"tracks_err_{pid}"] = err
st.rerun()
else:
err = st.session_state.get(f"tracks_err_{pid}")
if err:
st.warning(err)
elif cached_tracks:
nf_titles = {title for _, title in nf_list}
rows = [
{
"": "" if t["name"] in nf_titles else "",
"Title": t["name"],
"Artist": ", ".join(a["name"] for a in t.get("artists", [])),
}
for t in cached_tracks
]
st.dataframe(
pd.DataFrame(rows),
hide_index=True,
use_container_width=True,
height=min(40 + len(rows) * 35, 300),
column_config={
"": st.column_config.TextColumn("", width="small"),
"Title": st.column_config.TextColumn("Title"),
"Artist": st.column_config.TextColumn("Artist"),
},
)
if nf_list:
st.caption(f"⚠️ {len(nf_list)} track(s) not found on Tidal")
else:
st.info("No tracks found for this playlist.")
st.divider()
# ── Gather selected playlists & action buttons ────
sel_pls = [p for p in spotify_playlists if st.session_state.get(f"sync_{p['id']}", False)]
wtch_pls = [p for p in spotify_playlists if p["id"] in load_watched()]
with sync_btn_col:
if st.button(f"▶ Sync Selected ({len(sel_pls)})", disabled=not sel_pls, type="primary", width="stretch"):
skipped = run_sync_playlists(sel_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(sel_pls) - len(skipped)
if synced:
st.success(f"{synced} playlist(s) synced!")
st.balloons()
with wtch_btn_col:
if st.button(f"👁 Sync Watched ({len(wtch_pls)})", disabled=not wtch_pls, width="stretch"):
skipped = run_sync_playlists(wtch_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(wtch_pls) - len(skipped)
if synced:
st.success(f"{synced} watched playlist(s) synced!")
st.balloons()
# ════════════════════════════════════════════════════════
# TAB 2 EXTERNAL PLAYLISTS
# ════════════════════════════════════════════════════════
with tab_ext:
st.subheader("Add a friend's playlist")
url_input = st.text_input(
"Spotify URL or ID",
placeholder="https://open.spotify.com/playlist/… or 3ktReNcDeGj94W59UXHU9R",
label_visibility="collapsed",
)
if st.button(" Add Playlist", type="primary"):
pid = url_input.strip()
if "spotify.com/playlist/" in pid:
pid = pid.split("spotify.com/playlist/")[-1].split("?")[0].strip()
if pid:
with st.spinner("Looking up playlist…"):
info = fetch_single_playlist(pid)
if info:
tf = info.get("tracks") or {}
upsert_external_playlist({
"id": info["id"],
"name": info["name"],
"description": info.get("description", ""),
"total_tracks": tf.get("total", 0) if isinstance(tf, dict) else 0,
"owner": (info.get("owner") or {}).get("display_name", ""),
})
st.success(f"✅ Added **{info['name']}**")
st.rerun()
else:
st.error("Playlist not found. Is it public? Check the URL/ID.")
else:
st.warning("Please enter a URL or ID.")
external = load_external_playlists()
if external:
st.subheader(f"Saved ({len(external)})")
ext_rows = [{"▶ Sync": False, "Playlist": p["name"], "Owner": p.get("owner",""), "Tracks": p.get("total_tracks",0), "ID": p["id"]} for p in external]
edit_ext = st.data_editor(
pd.DataFrame(ext_rows),
column_config={
"▶ Sync": st.column_config.CheckboxColumn("▶ Sync", width="small"),
"Playlist": st.column_config.TextColumn("Playlist", disabled=True),
"Owner": st.column_config.TextColumn("Owner", disabled=True, width="medium"),
"Tracks": st.column_config.NumberColumn("Tracks", disabled=True, width="small"),
"ID": st.column_config.TextColumn("Spotify ID", disabled=True, width="medium"),
},
hide_index=True, width="stretch", key="ext_editor",
)
ext_sel = [p for p in external if p["id"] in set(edit_ext[edit_ext["▶ Sync"]]["ID"].tolist())]
if st.button(f"▶ Sync Selected ({len(ext_sel)})", disabled=not ext_sel, type="primary", width="stretch", key="btn_sync_ext"):
with st.spinner("Fetching playlist details…"):
full_pls = [spotify_session.playlist(p["id"]) for p in ext_sel]
skipped = run_sync_playlists(full_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(full_pls) - len(skipped)
if synced:
st.success(f"{synced} external playlist(s) synced!")
st.divider()
rm = st.selectbox("Remove", ["— select —"] + [p["name"] for p in external], key="rm_ext")
if rm != "— select —" and st.button("🗑 Remove", type="secondary"):
rid = next((p["id"] for p in external if p["name"] == rm), None)
if rid:
remove_external_playlist(rid)
fetch_single_playlist.clear()
st.rerun()
else:
st.info("No external playlists added yet.")
# ════════════════════════════════════════════════════════
# TAB 3 NOT FOUND
# ════════════════════════════════════════════════════════
with tab_nf:
if not not_found:
st.success("✅ No unmatched tracks yet — run a sync first.")
else:
st.info(f"**{total_nf}** track(s) across **{len(not_found)}** playlist(s) couldn't be matched on Tidal.")
flt = st.selectbox("Filter by playlist", ["All playlists"] + sorted(not_found.keys()), key="nf_filter")
nf_rows = [
{"Playlist": pl, "Artist": a, "Title": t}
for pl, items in sorted(not_found.items())
if flt in ("All playlists", pl)
for a, t in items
]
if nf_rows:
st.dataframe(pd.DataFrame(nf_rows), hide_index=True, use_container_width=True,
height=min(80 + len(nf_rows) * 35, 500))
else:
st.info("No entries for this filter.")
# ── Liked Songs (opt-in) ──────────────────────────────────────────────────────
with st.expander("⭐ Sync Liked Songs (optional)", expanded=False):
st.write("Sync your Spotify 'Liked Songs' to your Tidal favorites.")
if st.button("Sync Liked Songs Now", type="secondary"):
with st.spinner("Syncing liked songs…"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_sync.sync_favorites(spotify_session, tidal_session, config))
loop.close()
st.success("✅ Liked songs synced!")

View File

@@ -2,7 +2,8 @@ spotify:
client_id: your_client_id
client_secret: your_client_secret
username: your_spotify_username
redirect_uri: http://localhost:8888/callback
redirect_uri: http://127.0.0.1:8888/callback
open_browser: True # Set to False if using a headless server environment
# uncomment this block if you want to only sync specific playlist IDs
@@ -14,6 +15,11 @@ spotify:
#excluded_playlists:
# - spotify:playlist:1ABCDEqsABCD6EaABCDa0a
# default setting for syncing favorites when no command line arguments are provided
# - when true: favorites will be synced by default (overriden when any command line arg provided)
# - when false: favorites can only be synced manually via --sync-favorites argument
sync_favorites_default: true
# increasing these parameters should increase the search speed, while decreasing reduces likelihood of 429 errors
max_concurrency: 10 # max concurrent connections at any given time
rate_limit: 12 # max sustained connections per second
rate_limit: 10 # max sustained connections per second

113
interactive_sync.py Normal file
View File

@@ -0,0 +1,113 @@
import yaml
import sys
import asyncio
from typing import List, Dict, Any
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
def print_menu(playlists: List[Dict[str, Any]]):
print("\n" + "="*50)
print(f"🎵 Spotify Playlists Found ({len(playlists)} total) 🎵")
print("="*50)
for i, p in enumerate(playlists):
track_count = p.get('tracks', {}).get('total', 'Unknown')
print(f"[{i+1:2d}] {p['name']:<35} ({track_count:>3} tracks) | ID: {p['id']}")
print("="*50)
print("Enter the numbers of the playlists you want to sync, separated by commas (e.g., 1, 4, 12).")
print("Enter 'ALL' to sync all playlists.")
print("Enter 'Q' to quit.")
async def main_async():
# 1. Load config
try:
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
sys.exit(f"Failed to load config.yml: {e}")
# 2. Authenticate
print("Authenticating with Spotify...")
spotify_session = _auth.open_spotify_session(config['spotify'])
print("Authenticating with Tidal...")
tidal_session = _auth.open_tidal_session()
if not tidal_session.check_login():
sys.exit("Could not connect to Tidal")
# 3. Fetch all Spotify user playlists
print("\nFetching Spotify playlists...")
spotify_playlists = await _sync.get_playlists_from_spotify(spotify_session, config)
if not spotify_playlists:
print("No Spotify playlists found!")
return
# 4. Interactive menu loop
while True:
print_menu(spotify_playlists)
choice = input("\nYour choice: ").strip().lower()
if choice == 'q':
print("Exiting...")
return
selected_playlists = []
if choice == 'all':
selected_playlists = spotify_playlists
print(f"Selected ALL {len(selected_playlists)} playlists for synchronization.")
break
else:
try:
# Parse comma-separated indices
indices = [int(x.strip()) - 1 for x in choice.split(',')]
for idx in indices:
if 0 <= idx < len(spotify_playlists):
selected_playlists.append(spotify_playlists[idx])
else:
print(f"Warning: Number {idx + 1} is out of range, ignoring.")
if not selected_playlists:
print("No valid playlists selected. Please try again.")
continue
print(f"Selected {len(selected_playlists)} playlists for synchronization.")
for p in selected_playlists:
print(f" - {p['name']}")
confirm = input("Proceed? (Y/n): ").strip().lower()
if confirm in ('', 'y', 'yes'):
break
except ValueError:
print("Invalid input. Please enter numbers separated by commas (e.g. 1, 2, 3) or 'ALL'.")
# 5. Execute the sync for the selected playlists
if not selected_playlists:
return
print("\n=== Starting Synchronization ===")
tidal_playlists = {p.name: p for p in await get_all_playlists(tidal_session.user)}
mappings_to_sync = []
for sp_playlist in selected_playlists:
mappings_to_sync.append(_sync.pick_tidal_playlist_for_spotify_playlist(sp_playlist, tidal_playlists))
for index, (sp_playlist, td_playlist) in enumerate(mappings_to_sync):
print(f"\n[{index+1}/{len(mappings_to_sync)}] Syncing playlist: {sp_playlist['name']}")
await _sync.sync_playlist(spotify_session, tidal_session, sp_playlist, td_playlist, config)
print("\n=== Synchronization Complete ===")
# Optionally sync favorites if enabled in config
if config.get('sync_favorites_default', True):
fav_confirm = input("\nWould you like to sync your 'Liked Songs' as well? (Y/n): ").strip().lower()
if fav_confirm in ('', 'y', 'yes'):
await _sync.sync_favorites(spotify_session, tidal_session, config)
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()

View File

@@ -4,15 +4,17 @@ build-backend = "setuptools.build_meta"
[project]
name = "spotify_to_tidal"
version = "0.1.0"
version = "1.0.7"
requires-python = ">= 3.10"
dependencies = [
"spotipy~=2.21.0",
"tidalapi==0.7.6",
"spotipy~=2.24",
"tidalapi~=0.8.10",
"pyyaml~=6.0",
"tqdm~=4.64",
"sqlalchemy~=2.0"
"sqlalchemy~=2.0",
"pytest~=8.0",
"pytest-mock~=3.8"
]
[tools.setuptools.packages."spotify_to_tidal"]

6
pytest.ini Normal file
View File

@@ -0,0 +1,6 @@
[pytest]
addopts = --maxfail=1 --disable-warnings
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

View File

@@ -1,4 +1,4 @@
A command line tool for importing your Spotify playlists into Tidal
A command line tool for importing your Spotify playlists into Tidal. Due to various performance optimisations, it is particularly suited for periodic synchronisation of very large collections.
Installation
-----------
@@ -19,6 +19,7 @@ Setup
Usage
----
To synchronize all of your Spotify playlists with your Tidal account run the following from the project root directory
Windows ignores python module paths by default, but you can run them using `python3 -m spotify_to_tidal`
```bash
spotify_to_tidal
@@ -30,6 +31,12 @@ You can also just synchronize a specific playlist by doing the following:
spotify_to_tidal --uri 1ABCDEqsABCD6EaABCDa0a # accepts playlist id or full playlist uri
```
or sync just your 'Liked Songs' with:
```bash
spotify_to_tidal --sync-favorites
```
See example_config.yml for more configuration options, and `spotify_to_tidal --help` for more options.
---

28
session.json Normal file
View File

@@ -0,0 +1,28 @@
{
"watched": [
"2sLEYLo3a6dDHbl4ypUkcl",
"3Sjb6ZlsPxIpzOMZVoq1pW",
"3eNcChQ2KssR8QjuV7KUEc",
"44Shnwiw8xk3UmKf7Unp6o",
"4ISU1ED9Vx8SH8o3JQVttw",
"4iVWLq7tdRfLzTbKsYhM56",
"5d49VR3snvfwyAJxJgB20s",
"7KG4kWpAQMhJxudcO42YkG"
],
"external_playlists": [
{
"id": "3ysFr4SgWB6IfzKgTij9Kj",
"name": "Down Down Down",
"description": "",
"total_tracks": 0,
"owner": "Lino Toran Jenner"
},
{
"id": "4YqSwVf1etm2ILxOwifQyv",
"name": "SunSet [Suppe]",
"description": "",
"total_tracks": 0,
"owner": "Lino Toran Jenner"
}
]
}

View File

@@ -9,6 +9,7 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.yml', help='location of the config file')
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
parser.add_argument('--sync-favorites', action=argparse.BooleanOptionalAction, help='synchronize the favorites')
args = parser.parse_args()
with open(args.config, 'r') as f:
@@ -22,15 +23,23 @@ def main():
if args.uri:
# if a playlist ID is explicitly provided as a command line argument then use that
spotify_playlist = spotify_session.playlist(args.uri)
tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session)
tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session)
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
_sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed
elif args.sync_favorites:
sync_favorites = True # sync only the favorites
elif config.get('sync_playlists', None):
# if the config contains a sync_playlists list of mappings then use that
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
else:
# otherwise just use the user playlists in the Spotify account
_sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
# otherwise sync all the user playlists in the Spotify account and favorites unless explicitly disabled
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
if sync_favorites:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)
if __name__ == '__main__':
main()

View File

@@ -11,13 +11,16 @@ __all__ = [
'open_tidal_session'
]
SPOTIFY_SCOPES = 'playlist-read-private playlist-read-collaborative user-library-read'
def open_spotify_session(config) -> spotipy.Spotify:
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
scope='playlist-read-private',
scope=SPOTIFY_SCOPES,
client_id=config['client_id'],
client_secret=config['client_secret'],
redirect_uri=config['redirect_uri'],
requests_timeout=2)
requests_timeout=2,
open_browser=config.get('open_browser', True))
try:
credentials_manager.get_access_token(as_dict=False)
except spotipy.SpotifyOauthError:

View File

@@ -2,16 +2,19 @@
import asyncio
from .cache import failure_cache, track_match_cache
import datetime
from difflib import SequenceMatcher
from functools import partial
from typing import List, Sequence, Set, Mapping
from typing import Callable, List, Sequence, Set, Mapping
import math
import requests
import sys
import spotipy
import tidalapi
from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist
from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites, get_all_playlists, get_all_playlist_tracks
import time
from tqdm.asyncio import tqdm as atqdm
from tqdm import tqdm
import traceback
import unicodedata
import math
@@ -50,7 +53,7 @@ def name_match(tidal_track, spotify_track) -> bool:
simple_spotify_track = simple(spotify_track['name'].lower()).split('feat.')[0].strip()
return simple_spotify_track in tidal_track.name.lower() or normalize(simple_spotify_track) in normalize(tidal_track.name.lower())
def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
def artist_match(tidal: tidalapi.Track | tidalapi.Album, spotify) -> bool:
def split_artist_name(artist: str) -> Sequence[str]:
if '&' in artist:
return artist.split('&')
@@ -59,9 +62,9 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
else:
return [artist]
def get_tidal_artists(tidal_track: tidalapi.Track, do_normalize=False) -> Set[str]:
def get_tidal_artists(tidal: tidalapi.Track | tidalapi.Album, do_normalize=False) -> Set[str]:
result: list[str] = []
for artist in tidal_track.artists:
for artist in tidal.artists:
if do_normalize:
artist_name = normalize(artist.name)
else:
@@ -69,9 +72,9 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
result.extend(split_artist_name(artist_name))
return set([simple(x.strip().lower()) for x in result])
def get_spotify_artists(spotify_track: t_spotify.SpotifyTrack, do_normalize=False) -> Set[str]:
def get_spotify_artists(spotify, do_normalize=False) -> Set[str]:
result: list[str] = []
for artist in spotify_track['artists']:
for artist in spotify['artists']:
if do_normalize:
artist_name = normalize(artist['name'])
else:
@@ -80,32 +83,42 @@ def artist_match(tidal_track: tidalapi.Track, spotify_track) -> bool:
return set([simple(x.strip().lower()) for x in result])
# There must be at least one overlapping artist between the Tidal and Spotify track
# Try with both un-normalized and then normalized
if get_tidal_artists(tidal_track).intersection(get_spotify_artists(spotify_track)) != set():
if get_tidal_artists(tidal).intersection(get_spotify_artists(spotify)) != set():
return True
return get_tidal_artists(tidal_track, True).intersection(get_spotify_artists(spotify_track, True)) != set()
return get_tidal_artists(tidal, True).intersection(get_spotify_artists(spotify, True)) != set()
def match(tidal_track, spotify_track) -> bool:
if not spotify_track['id']: return False
return isrc_match(tidal_track, spotify_track) or (
duration_match(tidal_track, spotify_track)
and name_match(tidal_track, spotify_track)
and artist_match(tidal_track, spotify_track)
)
def test_album_similarity(spotify_album, tidal_album, threshold=0.6):
return SequenceMatcher(None, simple(spotify_album['name']), simple(tidal_album.name)).ratio() >= threshold and artist_match(tidal_album, spotify_album)
async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Session) -> tidalapi.Track | None:
def _search_for_track_in_album():
# search for album name and first album artist
if 'album' in spotify_track and 'artists' in spotify_track['album'] and len(spotify_track['album']['artists']):
album_result = tidal_session.search(simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name']), models=[tidalapi.album.Album])
query = simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name'])
album_result = tidal_session.search(query, models=[tidalapi.album.Album])
for album in album_result['albums']:
if album.num_tracks >= spotify_track['track_number'] and test_album_similarity(spotify_track['album'], album):
album_tracks = album.tracks()
if len(album_tracks) >= spotify_track['track_number']:
if len(album_tracks) < spotify_track['track_number']:
assert( not len(album_tracks) == album.num_tracks ) # incorrect metadata :(
continue
track = album_tracks[spotify_track['track_number'] - 1]
if match(track, spotify_track):
failure_cache.remove_match_failure(spotify_track['id'])
return track
def _search_for_standalone_track():
# if album search fails then search for track name and first artist
for track in tidal_session.search(simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name']), models=[tidalapi.media.Track])['tracks']:
query = simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name'])
for track in tidal_session.search(query, models=[tidalapi.media.Track])['tracks']:
if match(track, spotify_track):
failure_cache.remove_match_failure(spotify_track['id'])
return track
@@ -117,25 +130,19 @@ async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Sess
track_search = await asyncio.to_thread( _search_for_standalone_track )
if track_search:
return track_search
return None
# if none of the search modes succeeded then store the track id to the failure cache
failure_cache.cache_match_failure(spotify_track['id'])
def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
# a dictionary of name --> playlist
print("Loading Tidal playlists... This may take some time.")
tidal_playlists = tidal_session.user.playlists()
output = {}
for playlist in tidal_playlists:
output[playlist.name] = playlist
return output
async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
# utility to repeat calling the function up to 5 times if an exception is thrown
try:
return await function(*args, **kwargs)
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException) as e:
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e:
# 403 Forbidden: retrying will never help — fail fast to avoid hanging
if isinstance(e, spotipy.exceptions.SpotifyException) and e.http_status == 403:
raise RuntimeError(f"Access denied (403) for playlist — it may be private or collaborative: {e}") from e
if remaining:
print(f"{str(e)} occurred, retrying {remaining} times")
else:
@@ -154,36 +161,51 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
time.sleep(sleep_schedule.get(remaining, 1))
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist: t_spotify.SpotifyPlaylist):
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
output = []
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] )
output.extend([r['track'] for r in results['items'] if r['track'] is not None])
results = fetch_function(0)
output.extend([item.get('track') or item.get('item') for item in results['items'] if item.get('track') or item.get('item')])
# get all the remaining tracks in parallel
# Get all the remaining tracks in parallel
if results['next']:
offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] )
extra_results = await atqdm.gather(
*[asyncio.to_thread(fetch_function, offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None])
output.extend([item.get('track') or item.get('item') for item in extra_result['items'] if item.get('track') or item.get('item')])
return output
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
return spotify_session.playlist_tracks(playlist_id=playlist_id, offset=offset)
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
track_filter = lambda item: item.get('type', 'track') == 'track' # type may be 'episode' also
sanity_filter = lambda item: ('album' in item
and 'name' in item['album']
and 'artists' in item['album']
and len(item['album']['artists']) > 0
and item['album']['artists'][0]['name'] is not None)
return list(filter(sanity_filter, filter(track_filter, items)))
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack):
for idx, tidal_track in list(enumerate(tidal_tracks)):
if match(tidal_track, spotify_track):
if tidal_track.available and match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id))
tidal_tracks.pop(idx)
return
def _populate_one_track_from_tidal(tidal_track: tidalapi.Track):
for idx, spotify_track in list(enumerate(spotify_tracks)):
if match(tidal_track, spotify_track):
if tidal_track.available and match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id))
spotify_tracks.pop(idx)
return
@@ -199,9 +221,8 @@ def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack]
for track in spotify_tracks:
_populate_one_track_from_spotify(track)
def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]:
''' Extracts only the tracks that have not already been seen in our Tidal caches '''
results = []
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
@@ -213,51 +234,83 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates '''
output = []
seen_tracks = set()
for spotify_track in spotify_tracks:
if not spotify_track['id']: continue
tidal_id = track_match_cache.get(spotify_track['id'])
if tidal_id and not tidal_id in seen_tracks:
if tidal_id:
if tidal_id in seen_tracks:
track_name = spotify_track['name']
artist_names = ', '.join([artist['name'] for artist in spotify_track['artists']])
print(f'Duplicate found: Track "{track_name}" by {artist_names} will be ignored')
else:
output.append(tidal_id)
seen_tracks.add(tidal_id)
return output
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config):
async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict):
""" Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache """
async def _run_rate_limiter(semaphore):
''' Leaky bucket algorithm for rate limiting. Periodically releases an item from semaphore at rate_limit'''
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
t0 = datetime.datetime.now()
while True:
await asyncio.sleep(1/config.get('rate_limit', 12)) # sleep for min time between new function executions
semaphore.release() # leak one item from the 'bucket'
await asyncio.sleep(_sleep_time)
t = datetime.datetime.now()
dt = (t - t0).total_seconds()
new_items = round(config.get('rate_limit', 10)*dt)
t0 = t
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'
# Create a new Tidal playlist if required
if not tidal_playlist:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
# Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
# Extract the new tracks that do not already exist in the old tidal tracklist
tracks_to_search = get_new_spotify_tracks(spotify_tracks)
if not tracks_to_search:
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
return
# Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name'])
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name)
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
rate_limiter_task.cancel()
# Add the search results to the cache
song404 = []
for idx, spotify_track in enumerate(tracks_to_search):
if search_results[idx]:
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
else:
song404.append(f"{spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}")
color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1])
print(color[0] + "Could not find the track " + song404[-1] + color[1])
file_name = "songs not found.txt"
header = f"==========================\nPlaylist: {playlist_name}\n==========================\n"
with open(file_name, "a", encoding="utf-8") as file:
file.write(header)
for song in song404:
file.write(f"{song}\n")
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict):
""" sync given playlist to tidal """
# Get the tracks from both Spotify and Tidal, creating a new Tidal playlist if necessary
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
if len(spotify_tracks) == 0:
return # nothing to do
if tidal_playlist:
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
else:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
old_tidal_tracks = []
# Extract the new tracks from the playlist that we haven't already seen before
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
# Update the Tidal playlist if there are changes
old_tidal_track_ids = [t.id for t in old_tidal_tracks]
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal playlist")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
@@ -265,13 +318,51 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
else:
# Erase old playlist and add new tracks from scratch if any reordering occured
set_tidal_playlist(tidal_playlist, new_tidal_track_ids)
clear_tidal_playlist(tidal_playlist)
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids)
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config):
async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict):
""" sync user favorites to tidal """
async def get_tracks_from_spotify_favorites() -> List[dict]:
_get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks(offset=offset)
tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks)
tracks.reverse()
return tracks
def get_new_tidal_favorites() -> List[int]:
existing_favorite_ids = set([track.id for track in old_tidal_tracks])
new_ids = []
for spotify_track in spotify_tracks:
match_id = track_match_cache.get(spotify_track['id'])
if match_id and not match_id in existing_favorite_ids:
new_ids.append(match_id)
return new_ids
print("Loading favorite tracks from Spotify")
spotify_tracks = await get_tracks_from_spotify_favorites()
print("Loading existing favorite tracks from Tidal")
old_tidal_tracks = await get_all_favorites(tidal_session.user.favorites, order='DATE')
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config)
new_tidal_favorite_ids = get_new_tidal_favorites()
if new_tidal_favorite_ids:
for tidal_id in tqdm(new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites"):
tidal_session.user.favorites.add_track(tidal_id)
else:
print("No new tracks to add to Tidal favorites")
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict):
for spotify_playlist, tidal_playlist in playlists:
# sync the spotify playlist to tidal
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
def get_tidal_playlists_wrapper(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user))
return {playlist.name: playlist for playlist in tidal_playlists}
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists:
# if there's an existing tidal playlist with the name of the current playlist then use that
@@ -283,39 +374,45 @@ def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists:
def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
results = []
spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config))
tidal_playlists = get_tidal_playlists_dict(tidal_session)
tidal_playlists = get_tidal_playlists_wrapper(tidal_session)
for spotify_playlist in spotify_playlists:
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
return results
async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
# get all the user playlists from the Spotify account
# get all the playlists from the Spotify account
playlists = []
print("Loading Spotify playlists")
results = spotify_session.user_playlists(config['spotify']['username'])
first_results = spotify_session.current_user_playlists()
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
playlists.extend([p for p in first_results['items']])
user_id = spotify_session.current_user()['id']
# get all the remaining playlists in parallel
if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.user_playlists, config['spotify']['username'], offset=offset) for offset in offsets ] )
if first_results['next']:
offsets = [ first_results['limit'] * n for n in range(1, math.ceil(first_results['total']/first_results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) for offset in offsets ] )
for extra_result in extra_results:
playlists.extend([p for p in extra_result['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list])
return playlists
playlists.extend([p for p in extra_result['items']])
# filter out playlists that don't belong to us or are on the exclude list
my_playlist_filter = lambda p: p and p['owner']['id'] == user_id
exclude_filter = lambda p: not p['id'] in exclude_list
return list(filter( exclude_filter, filter( my_playlist_filter, playlists )))
def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
# get the list of playlist sync mappings from the configuration file
def get_playlist_ids(config):
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
output = []
for spotify_id, tidal_id in get_playlist_ids(config):
for spotify_id, tidal_id in get_playlist_ids(config=config):
try:
spotify_playlist = spotify_session.playlist(spotify_id)
spotify_playlist = spotify_session.playlist(playlist_id=spotify_id)
except spotipy.SpotifyException as e:
print(f"Error getting Spotify playlist {spotify_id}")
raise e
try:
tidal_playlist = tidal_session.playlist(tidal_id)
tidal_playlist = tidal_session.playlist(playlist_id=tidal_id)
except Exception as e:
print(f"Error getting Tidal playlist {tidal_id}")
raise e

View File

@@ -1,6 +1,9 @@
import asyncio
import math
from typing import List
import tidalapi
from tqdm import tqdm
from tqdm.asyncio import tqdm as atqdm
def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]):
headers = {'If-None-Match': playlist._etag}
@@ -24,6 +27,53 @@ def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids:
offset += count
progress.update(count)
def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]):
clear_tidal_playlist(playlist)
add_multiple_tracks_to_playlist(playlist, track_ids)
async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track]:
"""
Helper function to get all items from a Tidal endpoint in parallel
The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead
"""
def _make_request(offset: int=0):
new_params = params
new_params['offset'] = offset
return session.request.map_request(url, params=new_params)
first_chunk_raw = _make_request()
limit = first_chunk_raw['limit']
total = first_chunk_raw['totalNumberOfItems']
items = session.request.map_json(first_chunk_raw, parse=parser)
if len(items) < total:
offsets = [limit * n for n in range(1, math.ceil(total/limit))]
extra_results = await atqdm.gather(
*[asyncio.to_thread(lambda offset: session.request.map_json(_make_request(offset), parse=parser), offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
items.extend(extra_result)
return items
async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC", chunk_size: int=100) -> List[tidalapi.Track]:
""" Get all favorites from Tidal playlist in chunks """
params = {
"limit": chunk_size,
"order": order,
"orderDirection": order_direction,
}
return await _get_all_chunks(f"{favorites.base_url}/tracks", session=favorites.session, parser=favorites.session.parse_track, params=params)
async def get_all_playlists(user: tidalapi.User, chunk_size: int=10) -> List[tidalapi.Playlist]:
""" Get all user playlists from Tidal in chunks """
print(f"Loading playlists from Tidal user")
params = {
"limit": chunk_size,
}
return await _get_all_chunks(f"users/{user.id}/playlists", session=user.session, parser=user.playlist.parse_factory, params=params)
async def get_all_playlist_tracks(playlist: tidalapi.Playlist, chunk_size: int=20) -> List[tidalapi.Track]:
""" Get all tracks from Tidal playlist in chunks """
params = {
"limit": chunk_size,
}
print(f"Loading tracks from Tidal playlist '{playlist.name}'")
return await _get_all_chunks(f"{playlist._base_url%playlist.id}/tracks", session=playlist.session, parser=playlist.session.parse_track, params=params)

5
tests/conftest.py Normal file
View File

@@ -0,0 +1,5 @@
import sys
import os
# Add the src directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))

0
tests/unit/__init__.py Normal file
View File

75
tests/unit/test_auth.py Normal file
View File

@@ -0,0 +1,75 @@
# tests/unit/test_auth.py
import pytest
import spotipy
import tidalapi
import yaml
import sys
from unittest import mock
from spotify_to_tidal.auth import open_spotify_session, open_tidal_session, SPOTIFY_SCOPES
def test_open_spotify_session(mocker):
# Mock the SpotifyOAuth class
mock_spotify_oauth = mocker.patch(
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
)
mock_spotify_instance = mocker.patch(
"spotify_to_tidal.auth.spotipy.Spotify", autospec=True
)
# Define a mock configuration
mock_config = {
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://127.0.0.1/",
"open_browser": True,
}
# Create a mock SpotifyOAuth instance
mock_oauth_instance = mock_spotify_oauth.return_value
mock_oauth_instance.get_access_token.return_value = "mock_access_token"
# Call the function under test
spotify_instance = open_spotify_session(mock_config)
# Assert that the SpotifyOAuth was called with correct parameters
mock_spotify_oauth.assert_called_once_with(
username="test_user",
scope=SPOTIFY_SCOPES,
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uri="http://127.0.0.1/",
requests_timeout=2,
open_browser=True,
)
# Assert that the Spotify instance was created
mock_spotify_instance.assert_called_once_with(oauth_manager=mock_oauth_instance)
assert spotify_instance == mock_spotify_instance.return_value
def test_open_spotify_session_oauth_error(mocker):
# Mock the SpotifyOAuth class and simulate an OAuth error
mock_spotify_oauth = mocker.patch(
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
)
mock_spotify_oauth.return_value.get_access_token.side_effect = (
spotipy.SpotifyOauthError("mock error")
)
# Define a mock configuration
mock_config = {
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://127.0.0.1/",
}
# Mock sys.exit to prevent the test from exiting
mock_sys_exit = mocker.patch("sys.exit")
# Call the function under test and assert sys.exit is called
open_spotify_session(mock_config)
mock_sys_exit.assert_called_once()

80
tests/unit/test_cache.py Normal file
View File

@@ -0,0 +1,80 @@
# tests/unit/test_cache.py
import pytest
import datetime
import sqlalchemy
from sqlalchemy import create_engine, select
from unittest import mock
from spotify_to_tidal.cache import MatchFailureDatabase, TrackMatchCache
# Setup an in-memory SQLite database for testing
@pytest.fixture
def in_memory_db():
engine = create_engine("sqlite:///:memory:")
return engine
# Test MatchFailureDatabase
def test_cache_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
with failure_db.engine.connect() as connection:
result = connection.execute(
select(failure_db.match_failures).where(
failure_db.match_failures.c.track_id == track_id
)
).fetchone()
assert result is not None
assert result.track_id == track_id
def test_has_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
assert failure_db.has_match_failure(track_id) is True
def test_remove_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
failure_db.remove_match_failure(track_id)
with failure_db.engine.connect() as connection:
result = connection.execute(
select(failure_db.match_failures).where(
failure_db.match_failures.c.track_id == track_id
)
).fetchone()
assert result is None
# Test TrackMatchCache
def test_track_match_cache_insert():
track_cache = TrackMatchCache()
track_cache.insert(("spotify_id", 123))
assert track_cache.get("spotify_id") == 123
def test_track_match_cache_get():
track_cache = TrackMatchCache()
track_cache.insert(("spotify_id", 123))
assert track_cache.get("spotify_id") == 123
assert track_cache.get("nonexistent_id") is None