34 Commits
0.1.4 ... main

Author SHA1 Message Date
2f762eed4b Standalone App; Works fine but only for MY OWN PLAYLISTS due to spotify dev rights :-( 2026-03-15 20:35:46 +01:00
Tim Rae
8a66f5f864 Fix bug in pyproject.toml: use tilde operator 2026-01-11 11:24:32 +01:00
Tim Rae
299cc3209f Don't specify point version of spotify and tidal APIs
So we don't have to put out a new release to update the dependency
2025-12-31 15:01:30 +01:00
Tim Rae
46771aefc8 Bump version to 1.0.6 2025-12-31 14:59:03 +01:00
Tim Rae
eb745eab6f Update tidalapi version 2025-12-31 14:54:44 +01:00
Alex Camilleri
9ff181ae6e Playlist name in txt output (#140) 2025-10-20 09:36:04 +02:00
Tim Rae
d90c5bac41 Update tidalapi dependency version to 0.8.8 (#136)
Fixes #135 #134
2025-10-16 12:22:15 +02:00
Axel
e62a8a80cb use 127.0.0.1 instead of localhost to meet new spotify URI criteria (#116) 2025-04-09 22:14:15 +02:00
Darkasf
a5afd975f0 Update readme.md (#113) 2025-03-30 06:51:13 +02:00
Johan Lindbergh
03e0396ac0 Add another check to the spotify track sanity filter (#111)
Discovered a crash when syncing a playlist with some random podcast
episode. It crashed because both ['artists'][0]['name'] and
['album']['artists'][0]['name'] was None.

I thought the episode would be caught by the track_filter, but
apparently having the 'type': 'episode' isn't reliable enough :/

Not sure if the check should be in the sanity_filter or any of
the _search_for... methods though.
2025-02-13 22:50:00 +01:00
Tim Rae
a438cda72b Bump version to 1.0.4 2024-12-04 06:09:12 +09:00
Tim Rae
2f1985a42b Check for null playlists in filter 2024-12-04 05:46:30 +09:00
Tim Rae
bcf2bbca0d Add option to do auth manually on headless server (#88) 2024-10-26 11:12:47 +02:00
Tim Rae
457da1724f Bump version to 1.0.3 2024-10-14 01:25:58 +02:00
Tim Rae
4d7c3b0ef0 Filter out tracks which don't have valid album metadata
Apply a final sanity filter to tracklist to validate assumption in matching
algorithm that the album has certain fields available.

In most situations this filter is not necessary, but occasionally we do
seem to encounter tracks that have no album metadata
2024-10-14 01:25:48 +02:00
Tim Rae
4fb702d008 Filter out podcast episodes (#83)
* Filter out podcast episodes from playlists
* Don't create empty playlists
* Bump version 1.0.2

Fixes #30
2024-09-28 08:34:58 +02:00
Tim Rae
693dcd110f Update version to 1.0.1 2024-09-12 16:59:45 +02:00
Tim Rae
17d60c019d Use user id from spotify session instead of config file
Fixes #80
2024-09-11 19:18:13 +02:00
mannp
7148053ad4 Update pyproject.toml (#79) 2024-09-09 19:03:56 +02:00
Tim Rae
64ea6f3df0 Bump version to 1.0.0
There have been some pretty significant changes recently
so figured it's time for a major version bump
2024-09-09 12:28:32 +02:00
Tim Rae
da1bc2eff9 Use utf-8 encoding for tracks not found 2024-09-09 10:02:22 +02:00
Tim Rae
5e3645319f Use current_user_playlists instead of user_playlists 2024-08-19 07:57:35 +02:00
Tim Rae
af0fd7a2cf Add retry functionality for playlist tracks 2024-08-19 07:57:35 +02:00
som
4f023d024b creates file for songs not found (#74) 2024-08-15 09:58:45 +02:00
Arnav Banerjee
54bb7097a7 fixed failing test (#68) 2024-06-11 20:32:31 +02:00
xerexoded
692939cd0e added circle CI config with basic unit tests 2024-06-10 19:29:13 +02:00
Tim Rae
ced8db44fe Fetch Tidal playlists/tracks asynchronously in chunks 2024-06-10 10:00:57 +02:00
Tim Rae
193e74b775 Never erase tracks from Tidal favorites 2024-06-10 09:32:44 +02:00
Tim Rae
7a1343ac91 Add config param for sync_favorites_default and cleanup command line arg 2024-06-10 09:32:44 +02:00
Tim Rae
6d6a4fe43e Fix favorite tracks getting cutoff at 1000 2024-06-10 09:32:44 +02:00
Tim Rae
1cdc62fd0c Cleanup partitioning of code used in favorites / playlist list 2024-06-10 09:32:44 +02:00
Cobal
54526a0306 feat: Sync favorites tracks to Tidal 2024-06-10 09:32:44 +02:00
Tim Rae
fab154851b Don't cache Tidal tracks that are unavailable
These would generally fail to be added to the Tidal playlist and could
cause playlists to be unnecessarily wiped every time the playlist is
synced
2024-06-09 12:28:52 +02:00
Raphael
b6340790ca Print duplicates that occurred while syncing (#64) 2024-06-09 10:50:10 +02:00
18 changed files with 1010 additions and 78 deletions

38
.circleci/config.yml Normal file
View File

@@ -0,0 +1,38 @@
version: 2.1
jobs:
setup:
docker:
- image: circleci/python:3.10
steps:
- checkout
- run:
name: Install dependencies
command: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-mock
test:
docker:
- image: circleci/python:3.10
steps:
- checkout
- run:
name: Install dependencies
command: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest pytest-mock
- run:
name: Run tests
command: pytest
workflows:
version: 2
test_workflow:
jobs:
- setup
- test:
requires:
- setup

4
.gitignore vendored
View File

@@ -3,6 +3,7 @@ config.yml
config.yaml config.yaml
.cache* .cache*
.session.yml .session.yml
songs not found.txt
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
@@ -30,3 +31,6 @@ venv/
.mypy_cache/ .mypy_cache/
.dmypy.json .dmypy.json
dmypy.json dmypy.json
# Visual Studio Code
.vscode/

11
Launch Spotify→Tidal.command Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/zsh
# Double-click this file in Finder to launch the Spotify→Tidal sync app.
# Change to the project directory (same folder as this script)
cd "$(dirname "$0")"
# Activate the virtual environment
source .venv/bin/activate
# Launch Streamlit (opens browser automatically)
streamlit run app.py

417
app.py Normal file
View File

@@ -0,0 +1,417 @@
import json
import math
import yaml
import asyncio
import pandas as pd
import streamlit as st
from pathlib import Path
import spotipy
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
# ── Constants ─────────────────────────────────────────────────────────────────
SESSION_FILE = Path("session.json")
NOT_FOUND_FILE = Path("songs not found.txt")
# ── Session persistence ───────────────────────────────────────────────────────
def load_session() -> dict:
if SESSION_FILE.exists():
try:
return json.loads(SESSION_FILE.read_text())
except Exception:
pass
return {"watched": [], "external_playlists": []}
def save_session(data: dict):
SESSION_FILE.write_text(json.dumps(data, indent=2))
def load_watched() -> set:
return set(load_session().get("watched", []))
def save_watched(ids: set):
data = load_session()
data["watched"] = sorted(ids)
save_session(data)
def load_external_playlists() -> list:
return load_session().get("external_playlists", [])
def upsert_external_playlist(info: dict):
data = load_session()
ext = [p for p in data.get("external_playlists", []) if p.get("id") != info["id"]]
ext.append(info)
data["external_playlists"] = ext
save_session(data)
def remove_external_playlist(pid: str):
data = load_session()
data["external_playlists"] = [p for p in data.get("external_playlists", []) if p.get("id") != pid]
save_session(data)
# ── Not-found parser ──────────────────────────────────────────────────────────
def parse_not_found() -> dict:
"""Returns {playlist_name: [(artist, title), ...]}"""
result = {}
if not NOT_FOUND_FILE.exists():
return result
current = None
for raw in NOT_FOUND_FILE.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("=="):
continue
if line.startswith("Playlist:"):
current = line[len("Playlist:"):].strip()
result.setdefault(current, [])
elif current is not None and ": " in line:
_, rest = line.split(": ", 1)
artist, title = (rest.split(" - ", 1) if " - " in rest else ("", rest))
result[current].append((artist.strip(), title.strip()))
return result
# ── Page config ───────────────────────────────────────────────────────────────
st.set_page_config(page_title="Spotify → Tidal Sync", layout="wide")
st.title("🎵 Spotify → Tidal Sync")
# ── Auth ──────────────────────────────────────────────────────────────────────
@st.cache_resource
def get_sessions():
try:
with open("config.yml", "r") as f:
config = yaml.safe_load(f)
except Exception as e:
st.error(f"Failed to load config.yml: {e}"); st.stop()
try:
sp = _auth.open_spotify_session(config["spotify"])
td = _auth.open_tidal_session()
if not td.check_login():
st.error("Could not connect to Tidal."); st.stop()
return config, sp, td
except Exception as e:
st.error(f"Authentication failed: {e}"); st.stop()
config, spotify_session, tidal_session = get_sessions()
# ── Data fetchers ─────────────────────────────────────────────────────────────
@st.cache_data
def fetch_my_playlists():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_sync.get_playlists_from_spotify(spotify_session, config))
loop.close()
return result
@st.cache_data(show_spinner=False)
def fetch_playlist_tracks(pid: str, name: str) -> tuple:
"""Returns (tracks: list, error: str|None). Uses direct API — no sys.exit retry wrapper."""
try:
all_tracks = []
result = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=0)
if result is None:
return [], "Empty response from Spotify."
def _extract(r):
return [
item["track"] for item in (r.get("items") or [])
if item.get("track") and item["track"].get("id")
]
all_tracks.extend(_extract(result))
total = result.get("total", 0)
if result.get("next") and total > 50:
for offset in range(50, total, 50):
page = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=offset)
if page:
all_tracks.extend(_extract(page))
return all_tracks, None
except spotipy.SpotifyException as e:
if e.http_status == 403:
return [], "🔒 Access denied (403) — this playlist may be private or collaborative."
return [], f"Spotify error {e.http_status}: {e.msg}"
except Exception as e:
return [], f"Error loading tracks: {e}"
@st.cache_data(show_spinner=False)
def fetch_single_playlist(pid: str) -> dict | None:
try:
return spotify_session.playlist(pid, fields="id,name,description,tracks.total,owner")
except Exception as e:
print(f"[ERROR] fetch_single_playlist: {e}")
return None
# ── Sync helper ───────────────────────────────────────────────────────────────
def run_sync_playlists(playlists: list):
async def _do():
tidal_map = {p.name: p for p in await get_all_playlists(tidal_session.user)}
total = len(playlists)
bar = st.progress(0, text=f"Starting sync of {total} playlist(s)…")
skipped = []
for i, sp in enumerate(playlists):
bar.progress(i / total, text=f"[{i+1}/{total}] {sp['name']}")
try:
await _sync.sync_playlist(spotify_session, tidal_session, sp, tidal_map.get(sp["name"]), config)
except RuntimeError as e:
skipped.append((sp["name"], str(e)))
print(f"[SKIP] {sp['name']}: {e}")
bar.progress(1.0, text="✅ Done!")
return skipped
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
skipped = loop.run_until_complete(_do())
loop.close()
return skipped or []
# ── Load state ────────────────────────────────────────────────────────────────
with st.spinner("Fetching your Spotify playlists…"):
spotify_playlists = fetch_my_playlists()
if not spotify_playlists:
st.warning("No Spotify playlists found!"); st.stop()
watched_ids = load_watched()
not_found = parse_not_found()
total_nf = sum(len(v) for v in not_found.values())
# ── Tabs ──────────────────────────────────────────────────────────────────────
tab_my, tab_ext, tab_nf = st.tabs([
f"🎵 My Playlists ({len(spotify_playlists)})",
"🔗 External Playlists",
f"❌ Not Found ({total_nf} tracks)",
])
# ════════════════════════════════════════════════════════
# TAB 1 MY PLAYLISTS (expandable rows)
# ════════════════════════════════════════════════════════
with tab_my:
# Collect playlists selected for sync via session_state checkboxes
sync_btn_col, wtch_btn_col, _ = st.columns([2, 2, 4])
# We need to render the list first so state keys exist, then read them
# ── Column headers ────────────────────────────────
hdr = st.columns([6, 1, 1, 1])
hdr[0].markdown("**Playlist**")
hdr[1].markdown("**Tracks**")
hdr[2].markdown("**Watch**")
hdr[3].markdown("**Sync**")
st.divider()
current_watched = load_watched() # fresh from disk
# Sort: watched first, then rest
sorted_playlists = (
[p for p in spotify_playlists if p["id"] in current_watched] +
[p for p in spotify_playlists if p["id"] not in current_watched]
)
showed_watched_header = False
showed_rest_header = False
for p in sorted_playlists:
pid = p["id"]
name = p["name"]
nf_list = not_found.get(name, [])
is_watched = pid in current_watched
tracks_key = f"tracks_{pid}"
# Section headers
if is_watched and not showed_watched_header:
st.markdown("### 👁 Watched Playlists")
showed_watched_header = True
elif not is_watched and not showed_rest_header:
if showed_watched_header:
st.divider()
st.markdown("### 🎵 All Playlists")
showed_rest_header = True
# Use cached tracks from session_state if already loaded
cached_tracks: list | None = st.session_state.get(tracks_key)
track_count = len(cached_tracks) if cached_tracks is not None else None
# Build expander label
count_str = str(track_count) if track_count is not None else ""
prefix = "" if is_watched else ""
warn = " ⚠️" if nf_list else ""
label = f"{prefix}{name}{count_str} tracks{warn}"
with st.expander(label, expanded=False):
left, right = st.columns([3, 1])
with right:
# Watch toggle
new_watch = st.checkbox("👁 Watch", value=is_watched, key=f"watch_{pid}")
if new_watch != is_watched:
if new_watch:
current_watched.add(pid)
else:
current_watched.discard(pid)
save_watched(current_watched)
st.rerun()
# Sync checkbox (persists in session_state automatically)
st.checkbox("▶ Add to sync", key=f"sync_{pid}")
with left:
if cached_tracks is None:
if st.button("🔍 Load tracks", key=f"load_{pid}"):
with st.spinner(f"Loading '{name}'"):
tracks, err = fetch_playlist_tracks(pid, name)
st.session_state[tracks_key] = tracks
if err:
st.session_state[f"tracks_err_{pid}"] = err
st.rerun()
else:
err = st.session_state.get(f"tracks_err_{pid}")
if err:
st.warning(err)
elif cached_tracks:
nf_titles = {title for _, title in nf_list}
rows = [
{
"": "" if t["name"] in nf_titles else "",
"Title": t["name"],
"Artist": ", ".join(a["name"] for a in t.get("artists", [])),
}
for t in cached_tracks
]
st.dataframe(
pd.DataFrame(rows),
hide_index=True,
use_container_width=True,
height=min(40 + len(rows) * 35, 300),
column_config={
"": st.column_config.TextColumn("", width="small"),
"Title": st.column_config.TextColumn("Title"),
"Artist": st.column_config.TextColumn("Artist"),
},
)
if nf_list:
st.caption(f"⚠️ {len(nf_list)} track(s) not found on Tidal")
else:
st.info("No tracks found for this playlist.")
st.divider()
# ── Gather selected playlists & action buttons ────
sel_pls = [p for p in spotify_playlists if st.session_state.get(f"sync_{p['id']}", False)]
wtch_pls = [p for p in spotify_playlists if p["id"] in load_watched()]
with sync_btn_col:
if st.button(f"▶ Sync Selected ({len(sel_pls)})", disabled=not sel_pls, type="primary", width="stretch"):
skipped = run_sync_playlists(sel_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(sel_pls) - len(skipped)
if synced:
st.success(f"{synced} playlist(s) synced!")
st.balloons()
with wtch_btn_col:
if st.button(f"👁 Sync Watched ({len(wtch_pls)})", disabled=not wtch_pls, width="stretch"):
skipped = run_sync_playlists(wtch_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(wtch_pls) - len(skipped)
if synced:
st.success(f"{synced} watched playlist(s) synced!")
st.balloons()
# ════════════════════════════════════════════════════════
# TAB 2 EXTERNAL PLAYLISTS
# ════════════════════════════════════════════════════════
with tab_ext:
st.subheader("Add a friend's playlist")
url_input = st.text_input(
"Spotify URL or ID",
placeholder="https://open.spotify.com/playlist/… or 3ktReNcDeGj94W59UXHU9R",
label_visibility="collapsed",
)
if st.button(" Add Playlist", type="primary"):
pid = url_input.strip()
if "spotify.com/playlist/" in pid:
pid = pid.split("spotify.com/playlist/")[-1].split("?")[0].strip()
if pid:
with st.spinner("Looking up playlist…"):
info = fetch_single_playlist(pid)
if info:
tf = info.get("tracks") or {}
upsert_external_playlist({
"id": info["id"],
"name": info["name"],
"description": info.get("description", ""),
"total_tracks": tf.get("total", 0) if isinstance(tf, dict) else 0,
"owner": (info.get("owner") or {}).get("display_name", ""),
})
st.success(f"✅ Added **{info['name']}**")
st.rerun()
else:
st.error("Playlist not found. Is it public? Check the URL/ID.")
else:
st.warning("Please enter a URL or ID.")
external = load_external_playlists()
if external:
st.subheader(f"Saved ({len(external)})")
ext_rows = [{"▶ Sync": False, "Playlist": p["name"], "Owner": p.get("owner",""), "Tracks": p.get("total_tracks",0), "ID": p["id"]} for p in external]
edit_ext = st.data_editor(
pd.DataFrame(ext_rows),
column_config={
"▶ Sync": st.column_config.CheckboxColumn("▶ Sync", width="small"),
"Playlist": st.column_config.TextColumn("Playlist", disabled=True),
"Owner": st.column_config.TextColumn("Owner", disabled=True, width="medium"),
"Tracks": st.column_config.NumberColumn("Tracks", disabled=True, width="small"),
"ID": st.column_config.TextColumn("Spotify ID", disabled=True, width="medium"),
},
hide_index=True, width="stretch", key="ext_editor",
)
ext_sel = [p for p in external if p["id"] in set(edit_ext[edit_ext["▶ Sync"]]["ID"].tolist())]
if st.button(f"▶ Sync Selected ({len(ext_sel)})", disabled=not ext_sel, type="primary", width="stretch", key="btn_sync_ext"):
with st.spinner("Fetching playlist details…"):
full_pls = [spotify_session.playlist(p["id"]) for p in ext_sel]
skipped = run_sync_playlists(full_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(full_pls) - len(skipped)
if synced:
st.success(f"{synced} external playlist(s) synced!")
st.divider()
rm = st.selectbox("Remove", ["— select —"] + [p["name"] for p in external], key="rm_ext")
if rm != "— select —" and st.button("🗑 Remove", type="secondary"):
rid = next((p["id"] for p in external if p["name"] == rm), None)
if rid:
remove_external_playlist(rid)
fetch_single_playlist.clear()
st.rerun()
else:
st.info("No external playlists added yet.")
# ════════════════════════════════════════════════════════
# TAB 3 NOT FOUND
# ════════════════════════════════════════════════════════
with tab_nf:
if not not_found:
st.success("✅ No unmatched tracks yet — run a sync first.")
else:
st.info(f"**{total_nf}** track(s) across **{len(not_found)}** playlist(s) couldn't be matched on Tidal.")
flt = st.selectbox("Filter by playlist", ["All playlists"] + sorted(not_found.keys()), key="nf_filter")
nf_rows = [
{"Playlist": pl, "Artist": a, "Title": t}
for pl, items in sorted(not_found.items())
if flt in ("All playlists", pl)
for a, t in items
]
if nf_rows:
st.dataframe(pd.DataFrame(nf_rows), hide_index=True, use_container_width=True,
height=min(80 + len(nf_rows) * 35, 500))
else:
st.info("No entries for this filter.")
# ── Liked Songs (opt-in) ──────────────────────────────────────────────────────
with st.expander("⭐ Sync Liked Songs (optional)", expanded=False):
st.write("Sync your Spotify 'Liked Songs' to your Tidal favorites.")
if st.button("Sync Liked Songs Now", type="secondary"):
with st.spinner("Syncing liked songs…"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_sync.sync_favorites(spotify_session, tidal_session, config))
loop.close()
st.success("✅ Liked songs synced!")

View File

@@ -2,7 +2,8 @@ spotify:
client_id: your_client_id client_id: your_client_id
client_secret: your_client_secret client_secret: your_client_secret
username: your_spotify_username username: your_spotify_username
redirect_uri: http://localhost:8888/callback redirect_uri: http://127.0.0.1:8888/callback
open_browser: True # Set to False if using a headless server environment
# uncomment this block if you want to only sync specific playlist IDs # uncomment this block if you want to only sync specific playlist IDs
@@ -14,6 +15,11 @@ spotify:
#excluded_playlists: #excluded_playlists:
# - spotify:playlist:1ABCDEqsABCD6EaABCDa0a # - spotify:playlist:1ABCDEqsABCD6EaABCDa0a
# default setting for syncing favorites when no command line arguments are provided
# - when true: favorites will be synced by default (overriden when any command line arg provided)
# - when false: favorites can only be synced manually via --sync-favorites argument
sync_favorites_default: true
# increasing these parameters should increase the search speed, while decreasing reduces likelihood of 429 errors # increasing these parameters should increase the search speed, while decreasing reduces likelihood of 429 errors
max_concurrency: 10 # max concurrent connections at any given time max_concurrency: 10 # max concurrent connections at any given time
rate_limit: 10 # max sustained connections per second rate_limit: 10 # max sustained connections per second

113
interactive_sync.py Normal file
View File

@@ -0,0 +1,113 @@
import yaml
import sys
import asyncio
from typing import List, Dict, Any
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
def print_menu(playlists: List[Dict[str, Any]]):
print("\n" + "="*50)
print(f"🎵 Spotify Playlists Found ({len(playlists)} total) 🎵")
print("="*50)
for i, p in enumerate(playlists):
track_count = p.get('tracks', {}).get('total', 'Unknown')
print(f"[{i+1:2d}] {p['name']:<35} ({track_count:>3} tracks) | ID: {p['id']}")
print("="*50)
print("Enter the numbers of the playlists you want to sync, separated by commas (e.g., 1, 4, 12).")
print("Enter 'ALL' to sync all playlists.")
print("Enter 'Q' to quit.")
async def main_async():
# 1. Load config
try:
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
sys.exit(f"Failed to load config.yml: {e}")
# 2. Authenticate
print("Authenticating with Spotify...")
spotify_session = _auth.open_spotify_session(config['spotify'])
print("Authenticating with Tidal...")
tidal_session = _auth.open_tidal_session()
if not tidal_session.check_login():
sys.exit("Could not connect to Tidal")
# 3. Fetch all Spotify user playlists
print("\nFetching Spotify playlists...")
spotify_playlists = await _sync.get_playlists_from_spotify(spotify_session, config)
if not spotify_playlists:
print("No Spotify playlists found!")
return
# 4. Interactive menu loop
while True:
print_menu(spotify_playlists)
choice = input("\nYour choice: ").strip().lower()
if choice == 'q':
print("Exiting...")
return
selected_playlists = []
if choice == 'all':
selected_playlists = spotify_playlists
print(f"Selected ALL {len(selected_playlists)} playlists for synchronization.")
break
else:
try:
# Parse comma-separated indices
indices = [int(x.strip()) - 1 for x in choice.split(',')]
for idx in indices:
if 0 <= idx < len(spotify_playlists):
selected_playlists.append(spotify_playlists[idx])
else:
print(f"Warning: Number {idx + 1} is out of range, ignoring.")
if not selected_playlists:
print("No valid playlists selected. Please try again.")
continue
print(f"Selected {len(selected_playlists)} playlists for synchronization.")
for p in selected_playlists:
print(f" - {p['name']}")
confirm = input("Proceed? (Y/n): ").strip().lower()
if confirm in ('', 'y', 'yes'):
break
except ValueError:
print("Invalid input. Please enter numbers separated by commas (e.g. 1, 2, 3) or 'ALL'.")
# 5. Execute the sync for the selected playlists
if not selected_playlists:
return
print("\n=== Starting Synchronization ===")
tidal_playlists = {p.name: p for p in await get_all_playlists(tidal_session.user)}
mappings_to_sync = []
for sp_playlist in selected_playlists:
mappings_to_sync.append(_sync.pick_tidal_playlist_for_spotify_playlist(sp_playlist, tidal_playlists))
for index, (sp_playlist, td_playlist) in enumerate(mappings_to_sync):
print(f"\n[{index+1}/{len(mappings_to_sync)}] Syncing playlist: {sp_playlist['name']}")
await _sync.sync_playlist(spotify_session, tidal_session, sp_playlist, td_playlist, config)
print("\n=== Synchronization Complete ===")
# Optionally sync favorites if enabled in config
if config.get('sync_favorites_default', True):
fav_confirm = input("\nWould you like to sync your 'Liked Songs' as well? (Y/n): ").strip().lower()
if fav_confirm in ('', 'y', 'yes'):
await _sync.sync_favorites(spotify_session, tidal_session, config)
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()

View File

@@ -4,15 +4,17 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "spotify_to_tidal" name = "spotify_to_tidal"
version = "0.1.4" version = "1.0.7"
requires-python = ">= 3.10" requires-python = ">= 3.10"
dependencies = [ dependencies = [
"spotipy~=2.21.0", "spotipy~=2.24",
"tidalapi==0.7.6", "tidalapi~=0.8.10",
"pyyaml~=6.0", "pyyaml~=6.0",
"tqdm~=4.64", "tqdm~=4.64",
"sqlalchemy~=2.0" "sqlalchemy~=2.0",
"pytest~=8.0",
"pytest-mock~=3.8"
] ]
[tools.setuptools.packages."spotify_to_tidal"] [tools.setuptools.packages."spotify_to_tidal"]

6
pytest.ini Normal file
View File

@@ -0,0 +1,6 @@
[pytest]
addopts = --maxfail=1 --disable-warnings
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

View File

@@ -19,6 +19,7 @@ Setup
Usage Usage
---- ----
To synchronize all of your Spotify playlists with your Tidal account run the following from the project root directory To synchronize all of your Spotify playlists with your Tidal account run the following from the project root directory
Windows ignores python module paths by default, but you can run them using `python3 -m spotify_to_tidal`
```bash ```bash
spotify_to_tidal spotify_to_tidal
@@ -30,6 +31,12 @@ You can also just synchronize a specific playlist by doing the following:
spotify_to_tidal --uri 1ABCDEqsABCD6EaABCDa0a # accepts playlist id or full playlist uri spotify_to_tidal --uri 1ABCDEqsABCD6EaABCDa0a # accepts playlist id or full playlist uri
``` ```
or sync just your 'Liked Songs' with:
```bash
spotify_to_tidal --sync-favorites
```
See example_config.yml for more configuration options, and `spotify_to_tidal --help` for more options. See example_config.yml for more configuration options, and `spotify_to_tidal --help` for more options.
--- ---

28
session.json Normal file
View File

@@ -0,0 +1,28 @@
{
"watched": [
"2sLEYLo3a6dDHbl4ypUkcl",
"3Sjb6ZlsPxIpzOMZVoq1pW",
"3eNcChQ2KssR8QjuV7KUEc",
"44Shnwiw8xk3UmKf7Unp6o",
"4ISU1ED9Vx8SH8o3JQVttw",
"4iVWLq7tdRfLzTbKsYhM56",
"5d49VR3snvfwyAJxJgB20s",
"7KG4kWpAQMhJxudcO42YkG"
],
"external_playlists": [
{
"id": "3ysFr4SgWB6IfzKgTij9Kj",
"name": "Down Down Down",
"description": "",
"total_tracks": 0,
"owner": "Lino Toran Jenner"
},
{
"id": "4YqSwVf1etm2ILxOwifQyv",
"name": "SunSet [Suppe]",
"description": "",
"total_tracks": 0,
"owner": "Lino Toran Jenner"
}
]
}

View File

@@ -9,6 +9,7 @@ def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.yml', help='location of the config file') parser.add_argument('--config', default='config.yml', help='location of the config file')
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config') parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
parser.add_argument('--sync-favorites', action=argparse.BooleanOptionalAction, help='synchronize the favorites')
args = parser.parse_args() args = parser.parse_args()
with open(args.config, 'r') as f: with open(args.config, 'r') as f:
@@ -22,15 +23,23 @@ def main():
if args.uri: if args.uri:
# if a playlist ID is explicitly provided as a command line argument then use that # if a playlist ID is explicitly provided as a command line argument then use that
spotify_playlist = spotify_session.playlist(args.uri) spotify_playlist = spotify_session.playlist(args.uri)
tidal_playlists = _sync.get_tidal_playlists_dict(tidal_session) tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session)
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
_sync.sync_list(spotify_session, tidal_session, [tidal_playlist], config) _sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed
elif args.sync_favorites:
sync_favorites = True # sync only the favorites
elif config.get('sync_playlists', None): elif config.get('sync_playlists', None):
# if the config contains a sync_playlists list of mappings then use that # if the config contains a sync_playlists list of mappings then use that
_sync.sync_list(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config) _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
else: else:
# otherwise just use the user playlists in the Spotify account # otherwise sync all the user playlists in the Spotify account and favorites unless explicitly disabled
_sync.sync_list(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config) _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
if sync_favorites:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -11,13 +11,16 @@ __all__ = [
'open_tidal_session' 'open_tidal_session'
] ]
SPOTIFY_SCOPES = 'playlist-read-private playlist-read-collaborative user-library-read'
def open_spotify_session(config) -> spotipy.Spotify: def open_spotify_session(config) -> spotipy.Spotify:
credentials_manager = spotipy.SpotifyOAuth(username=config['username'], credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
scope='playlist-read-private', scope=SPOTIFY_SCOPES,
client_id=config['client_id'], client_id=config['client_id'],
client_secret=config['client_secret'], client_secret=config['client_secret'],
redirect_uri=config['redirect_uri'], redirect_uri=config['redirect_uri'],
requests_timeout=2) requests_timeout=2,
open_browser=config.get('open_browser', True))
try: try:
credentials_manager.get_access_token(as_dict=False) credentials_manager.get_access_token(as_dict=False)
except spotipy.SpotifyOauthError: except spotipy.SpotifyOauthError:

View File

@@ -5,15 +5,16 @@ from .cache import failure_cache, track_match_cache
import datetime import datetime
from difflib import SequenceMatcher from difflib import SequenceMatcher
from functools import partial from functools import partial
from typing import List, Sequence, Set, Mapping from typing import Callable, List, Sequence, Set, Mapping
import math import math
import requests import requests
import sys import sys
import spotipy import spotipy
import tidalapi import tidalapi
from .tidalapi_patch import add_multiple_tracks_to_playlist, set_tidal_playlist from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites, get_all_playlists, get_all_playlist_tracks
import time import time
from tqdm.asyncio import tqdm as atqdm from tqdm.asyncio import tqdm as atqdm
from tqdm import tqdm
import traceback import traceback
import unicodedata import unicodedata
import math import math
@@ -133,20 +134,15 @@ async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Sess
# if none of the search modes succeeded then store the track id to the failure cache # if none of the search modes succeeded then store the track id to the failure cache
failure_cache.cache_match_failure(spotify_track['id']) failure_cache.cache_match_failure(spotify_track['id'])
def get_tidal_playlists_dict(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
# a dictionary of name --> playlist
print("Loading Tidal playlists... This may take some time.")
tidal_playlists = tidal_session.user.playlists()
output = {}
for playlist in tidal_playlists:
output[playlist.name] = playlist
return output
async def repeat_on_request_error(function, *args, remaining=5, **kwargs): async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
# utility to repeat calling the function up to 5 times if an exception is thrown # utility to repeat calling the function up to 5 times if an exception is thrown
try: try:
return await function(*args, **kwargs) return await function(*args, **kwargs)
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException) as e: except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e:
# 403 Forbidden: retrying will never help — fail fast to avoid hanging
if isinstance(e, spotipy.exceptions.SpotifyException) and e.http_status == 403:
raise RuntimeError(f"Access denied (403) for playlist — it may be private or collaborative: {e}") from e
if remaining: if remaining:
print(f"{str(e)} occurred, retrying {remaining} times") print(f"{str(e)} occurred, retrying {remaining} times")
else: else:
@@ -165,36 +161,51 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
time.sleep(sleep_schedule.get(remaining, 1)) time.sleep(sleep_schedule.get(remaining, 1))
return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs) return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs)
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, spotify_session: spotipy.Spotify, playlist_id: str):
fields="next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc)))"
return spotify_session.playlist_tracks(playlist_id, fields, offset=offset)
async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
output = [] output = []
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") results = fetch_function(0)
results = _get_tracks_from_spotify_playlist( 0, spotify_session, spotify_playlist["id"] ) output.extend([item.get('track') or item.get('item') for item in results['items'] if item.get('track') or item.get('item')])
output.extend([r['track'] for r in results['items'] if r['track'] is not None])
# get all the remaining tracks in parallel # Get all the remaining tracks in parallel
if results['next']: if results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ] offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))]
extra_results = await atqdm.gather( *[asyncio.to_thread(_get_tracks_from_spotify_playlist, offset, spotify_session=spotify_session, playlist_id=spotify_playlist["id"]) for offset in offsets ] ) extra_results = await atqdm.gather(
*[asyncio.to_thread(fetch_function, offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results: for extra_result in extra_results:
output.extend([r['track'] for r in extra_result['items'] if r['track'] is not None]) output.extend([item.get('track') or item.get('item') for item in extra_result['items'] if item.get('track') or item.get('item')])
return output return output
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
return spotify_session.playlist_tracks(playlist_id=playlist_id, offset=offset)
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
track_filter = lambda item: item.get('type', 'track') == 'track' # type may be 'episode' also
sanity_filter = lambda item: ('album' in item
and 'name' in item['album']
and 'artists' in item['album']
and len(item['album']['artists']) > 0
and item['album']['artists'][0]['name'] is not None)
return list(filter(sanity_filter, filter(track_filter, items)))
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]): def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
""" Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """ """ Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """
def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack): def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack):
for idx, tidal_track in list(enumerate(tidal_tracks)): for idx, tidal_track in list(enumerate(tidal_tracks)):
if match(tidal_track, spotify_track): if tidal_track.available and match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id)) track_match_cache.insert((spotify_track['id'], tidal_track.id))
tidal_tracks.pop(idx) tidal_tracks.pop(idx)
return return
def _populate_one_track_from_tidal(tidal_track: tidalapi.Track): def _populate_one_track_from_tidal(tidal_track: tidalapi.Track):
for idx, spotify_track in list(enumerate(spotify_tracks)): for idx, spotify_track in list(enumerate(spotify_tracks)):
if match(tidal_track, spotify_track): if tidal_track.available and match(tidal_track, spotify_track):
track_match_cache.insert((spotify_track['id'], tidal_track.id)) track_match_cache.insert((spotify_track['id'], tidal_track.id))
spotify_tracks.pop(idx) spotify_tracks.pop(idx)
return return
@@ -210,9 +221,8 @@ def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack]
for track in spotify_tracks: for track in spotify_tracks:
_populate_one_track_from_spotify(track) _populate_one_track_from_spotify(track)
def get_new_tracks_from_spotify_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack], old_tidal_tracks: Sequence[tidalapi.Track]) -> list[t_spotify.SpotifyTrack]: def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]:
''' Extracts only the new tracks in the Spotify playlist that are not already on Tidal or known match failures ''' ''' Extracts only the tracks that have not already been seen in our Tidal caches '''
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
results = [] results = []
for spotify_track in spotify_tracks: for spotify_track in spotify_tracks:
if not spotify_track['id']: continue if not spotify_track['id']: continue
@@ -224,15 +234,22 @@ def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.Spotify
''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates ''' ''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates '''
output = [] output = []
seen_tracks = set() seen_tracks = set()
for spotify_track in spotify_tracks: for spotify_track in spotify_tracks:
if not spotify_track['id']: continue if not spotify_track['id']: continue
tidal_id = track_match_cache.get(spotify_track['id']) tidal_id = track_match_cache.get(spotify_track['id'])
if tidal_id and not tidal_id in seen_tracks: if tidal_id:
output.append(tidal_id) if tidal_id in seen_tracks:
seen_tracks.add(tidal_id) track_name = spotify_track['name']
artist_names = ', '.join([artist['name'] for artist in spotify_track['artists']])
print(f'Duplicate found: Track "{track_name}" by {artist_names} will be ignored')
else:
output.append(tidal_id)
seen_tracks.add(tidal_id)
return output return output
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config): async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict):
""" Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache """
async def _run_rate_limiter(semaphore): async def _run_rate_limiter(semaphore):
''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit''' ''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit'''
_sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket' _sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket'
@@ -245,37 +262,55 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
t0 = t t0 = t
[semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket' [semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket'
# Create a new Tidal playlist if required # Extract the new tracks that do not already exist in the old tidal tracklist
if not tidal_playlist: tracks_to_search = get_new_spotify_tracks(spotify_tracks)
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
# Extract the new tracks from the playlist that we haven't already seen before
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
old_tidal_tracks = tidal_playlist.tracks()
tracks_to_search = get_new_tracks_from_spotify_playlist(spotify_tracks, old_tidal_tracks)
if not tracks_to_search: if not tracks_to_search:
print("No new tracks to search in Spotify playlist '{}'".format(spotify_playlist['name']))
return return
# Search for each of the tracks on Tidal concurrently # Search for each of the tracks on Tidal concurrently
task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), spotify_playlist['name']) task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name)
semaphore = asyncio.Semaphore(config.get('max_concurrency', 10)) semaphore = asyncio.Semaphore(config.get('max_concurrency', 10))
rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore)) rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore))
search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description ) search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description )
rate_limiter_task.cancel() rate_limiter_task.cancel()
# Add the search results to the cache # Add the search results to the cache
song404 = []
for idx, spotify_track in enumerate(tracks_to_search): for idx, spotify_track in enumerate(tracks_to_search):
if search_results[idx]: if search_results[idx]:
track_match_cache.insert( (spotify_track['id'], search_results[idx].id) ) track_match_cache.insert( (spotify_track['id'], search_results[idx].id) )
else: else:
song404.append(f"{spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}")
color = ('\033[91m', '\033[0m') color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find track {}: {} - {}".format(spotify_track['id'], ",".join([a['name'] for a in spotify_track['artists']]), spotify_track['name']) + color[1]) print(color[0] + "Could not find the track " + song404[-1] + color[1])
file_name = "songs not found.txt"
header = f"==========================\nPlaylist: {playlist_name}\n==========================\n"
with open(file_name, "a", encoding="utf-8") as file:
file.write(header)
for song in song404:
file.write(f"{song}\n")
async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict):
""" sync given playlist to tidal """
# Get the tracks from both Spotify and Tidal, creating a new Tidal playlist if necessary
spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist)
if len(spotify_tracks) == 0:
return # nothing to do
if tidal_playlist:
old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist)
else:
print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist")
tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description'])
old_tidal_tracks = []
# Extract the new tracks from the playlist that we haven't already seen before
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config)
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
# Update the Tidal playlist if there are changes # Update the Tidal playlist if there are changes
old_tidal_track_ids = [t.id for t in old_tidal_tracks] old_tidal_track_ids = [t.id for t in old_tidal_tracks]
new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks)
if new_tidal_track_ids == old_tidal_track_ids: if new_tidal_track_ids == old_tidal_track_ids:
print("No changes to write to Tidal playlist") print("No changes to write to Tidal playlist")
elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids:
@@ -283,13 +318,51 @@ async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalap
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):]) add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):])
else: else:
# Erase old playlist and add new tracks from scratch if any reordering occured # Erase old playlist and add new tracks from scratch if any reordering occured
set_tidal_playlist(tidal_playlist, new_tidal_track_ids) clear_tidal_playlist(tidal_playlist)
add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids)
def sync_list(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config): async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict):
""" sync user favorites to tidal """
async def get_tracks_from_spotify_favorites() -> List[dict]:
_get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks(offset=offset)
tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks)
tracks.reverse()
return tracks
def get_new_tidal_favorites() -> List[int]:
existing_favorite_ids = set([track.id for track in old_tidal_tracks])
new_ids = []
for spotify_track in spotify_tracks:
match_id = track_match_cache.get(spotify_track['id'])
if match_id and not match_id in existing_favorite_ids:
new_ids.append(match_id)
return new_ids
print("Loading favorite tracks from Spotify")
spotify_tracks = await get_tracks_from_spotify_favorites()
print("Loading existing favorite tracks from Tidal")
old_tidal_tracks = await get_all_favorites(tidal_session.user.favorites, order='DATE')
populate_track_match_cache(spotify_tracks, old_tidal_tracks)
await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config)
new_tidal_favorite_ids = get_new_tidal_favorites()
if new_tidal_favorite_ids:
for tidal_id in tqdm(new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites"):
tidal_session.user.favorites.add_track(tidal_id)
else:
print("No new tracks to add to Tidal favorites")
def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict):
for spotify_playlist, tidal_playlist in playlists: for spotify_playlist, tidal_playlist in playlists:
# sync the spotify playlist to tidal # sync the spotify playlist to tidal
asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) ) asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) )
def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config))
def get_tidal_playlists_wrapper(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]:
tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user))
return {playlist.name: playlist for playlist in tidal_playlists}
def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]): def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]):
if spotify_playlist['name'] in tidal_playlists: if spotify_playlist['name'] in tidal_playlists:
# if there's an existing tidal playlist with the name of the current playlist then use that # if there's an existing tidal playlist with the name of the current playlist then use that
@@ -301,40 +374,45 @@ def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists:
def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
results = [] results = []
spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config)) spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config))
tidal_playlists = get_tidal_playlists_dict(tidal_session) tidal_playlists = get_tidal_playlists_wrapper(tidal_session)
for spotify_playlist in spotify_playlists: for spotify_playlist in spotify_playlists:
results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) ) results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) )
return results return results
async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
# get all the user playlists from the Spotify account # get all the playlists from the Spotify account
playlists = [] playlists = []
print("Loading Spotify playlists") print("Loading Spotify playlists")
results = spotify_session.user_playlists(config['spotify']['username']) first_results = spotify_session.current_user_playlists()
exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])]) exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])])
playlists.extend([p for p in results['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list]) playlists.extend([p for p in first_results['items']])
user_id = spotify_session.current_user()['id']
# get all the remaining playlists in parallel # get all the remaining playlists in parallel
if results['next']: if first_results['next']:
offsets = [ results['limit'] * n for n in range(1, math.ceil(results['total']/results['limit'])) ] offsets = [ first_results['limit'] * n for n in range(1, math.ceil(first_results['total']/first_results['limit'])) ]
extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.user_playlists, config['spotify']['username'], offset=offset) for offset in offsets ] ) extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) for offset in offsets ] )
for extra_result in extra_results: for extra_result in extra_results:
playlists.extend([p for p in extra_result['items'] if p['owner']['id'] == config['spotify']['username'] and not p['id'] in exclude_list]) playlists.extend([p for p in extra_result['items']])
return playlists
# filter out playlists that don't belong to us or are on the exclude list
my_playlist_filter = lambda p: p and p['owner']['id'] == user_id
exclude_filter = lambda p: not p['id'] in exclude_list
return list(filter( exclude_filter, filter( my_playlist_filter, playlists )))
def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config):
# get the list of playlist sync mappings from the configuration file # get the list of playlist sync mappings from the configuration file
def get_playlist_ids(config): def get_playlist_ids(config):
return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']] return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']]
output = [] output = []
for spotify_id, tidal_id in get_playlist_ids(config): for spotify_id, tidal_id in get_playlist_ids(config=config):
try: try:
spotify_playlist = spotify_session.playlist(spotify_id) spotify_playlist = spotify_session.playlist(playlist_id=spotify_id)
except spotipy.SpotifyException as e: except spotipy.SpotifyException as e:
print(f"Error getting Spotify playlist {spotify_id}") print(f"Error getting Spotify playlist {spotify_id}")
raise e raise e
try: try:
tidal_playlist = tidal_session.playlist(tidal_id) tidal_playlist = tidal_session.playlist(playlist_id=tidal_id)
except Exception as e: except Exception as e:
print(f"Error getting Tidal playlist {tidal_id}") print(f"Error getting Tidal playlist {tidal_id}")
raise e raise e

View File

@@ -1,6 +1,9 @@
import asyncio
import math
from typing import List from typing import List
import tidalapi import tidalapi
from tqdm import tqdm from tqdm import tqdm
from tqdm.asyncio import tqdm as atqdm
def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]): def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]):
headers = {'If-None-Match': playlist._etag} headers = {'If-None-Match': playlist._etag}
@@ -24,6 +27,53 @@ def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids:
offset += count offset += count
progress.update(count) progress.update(count)
def set_tidal_playlist(playlist: tidalapi.Playlist, track_ids: List[int]): async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track]:
clear_tidal_playlist(playlist) """
add_multiple_tracks_to_playlist(playlist, track_ids) Helper function to get all items from a Tidal endpoint in parallel
The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead
"""
def _make_request(offset: int=0):
new_params = params
new_params['offset'] = offset
return session.request.map_request(url, params=new_params)
first_chunk_raw = _make_request()
limit = first_chunk_raw['limit']
total = first_chunk_raw['totalNumberOfItems']
items = session.request.map_json(first_chunk_raw, parse=parser)
if len(items) < total:
offsets = [limit * n for n in range(1, math.ceil(total/limit))]
extra_results = await atqdm.gather(
*[asyncio.to_thread(lambda offset: session.request.map_json(_make_request(offset), parse=parser), offset) for offset in offsets],
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
items.extend(extra_result)
return items
async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC", chunk_size: int=100) -> List[tidalapi.Track]:
""" Get all favorites from Tidal playlist in chunks """
params = {
"limit": chunk_size,
"order": order,
"orderDirection": order_direction,
}
return await _get_all_chunks(f"{favorites.base_url}/tracks", session=favorites.session, parser=favorites.session.parse_track, params=params)
async def get_all_playlists(user: tidalapi.User, chunk_size: int=10) -> List[tidalapi.Playlist]:
""" Get all user playlists from Tidal in chunks """
print(f"Loading playlists from Tidal user")
params = {
"limit": chunk_size,
}
return await _get_all_chunks(f"users/{user.id}/playlists", session=user.session, parser=user.playlist.parse_factory, params=params)
async def get_all_playlist_tracks(playlist: tidalapi.Playlist, chunk_size: int=20) -> List[tidalapi.Track]:
""" Get all tracks from Tidal playlist in chunks """
params = {
"limit": chunk_size,
}
print(f"Loading tracks from Tidal playlist '{playlist.name}'")
return await _get_all_chunks(f"{playlist._base_url%playlist.id}/tracks", session=playlist.session, parser=playlist.session.parse_track, params=params)

5
tests/conftest.py Normal file
View File

@@ -0,0 +1,5 @@
import sys
import os
# Add the src directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))

0
tests/unit/__init__.py Normal file
View File

75
tests/unit/test_auth.py Normal file
View File

@@ -0,0 +1,75 @@
# tests/unit/test_auth.py
import pytest
import spotipy
import tidalapi
import yaml
import sys
from unittest import mock
from spotify_to_tidal.auth import open_spotify_session, open_tidal_session, SPOTIFY_SCOPES
def test_open_spotify_session(mocker):
# Mock the SpotifyOAuth class
mock_spotify_oauth = mocker.patch(
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
)
mock_spotify_instance = mocker.patch(
"spotify_to_tidal.auth.spotipy.Spotify", autospec=True
)
# Define a mock configuration
mock_config = {
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://127.0.0.1/",
"open_browser": True,
}
# Create a mock SpotifyOAuth instance
mock_oauth_instance = mock_spotify_oauth.return_value
mock_oauth_instance.get_access_token.return_value = "mock_access_token"
# Call the function under test
spotify_instance = open_spotify_session(mock_config)
# Assert that the SpotifyOAuth was called with correct parameters
mock_spotify_oauth.assert_called_once_with(
username="test_user",
scope=SPOTIFY_SCOPES,
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uri="http://127.0.0.1/",
requests_timeout=2,
open_browser=True,
)
# Assert that the Spotify instance was created
mock_spotify_instance.assert_called_once_with(oauth_manager=mock_oauth_instance)
assert spotify_instance == mock_spotify_instance.return_value
def test_open_spotify_session_oauth_error(mocker):
# Mock the SpotifyOAuth class and simulate an OAuth error
mock_spotify_oauth = mocker.patch(
"spotify_to_tidal.auth.spotipy.SpotifyOAuth", autospec=True
)
mock_spotify_oauth.return_value.get_access_token.side_effect = (
spotipy.SpotifyOauthError("mock error")
)
# Define a mock configuration
mock_config = {
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://127.0.0.1/",
}
# Mock sys.exit to prevent the test from exiting
mock_sys_exit = mocker.patch("sys.exit")
# Call the function under test and assert sys.exit is called
open_spotify_session(mock_config)
mock_sys_exit.assert_called_once()

80
tests/unit/test_cache.py Normal file
View File

@@ -0,0 +1,80 @@
# tests/unit/test_cache.py
import pytest
import datetime
import sqlalchemy
from sqlalchemy import create_engine, select
from unittest import mock
from spotify_to_tidal.cache import MatchFailureDatabase, TrackMatchCache
# Setup an in-memory SQLite database for testing
@pytest.fixture
def in_memory_db():
engine = create_engine("sqlite:///:memory:")
return engine
# Test MatchFailureDatabase
def test_cache_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
with failure_db.engine.connect() as connection:
result = connection.execute(
select(failure_db.match_failures).where(
failure_db.match_failures.c.track_id == track_id
)
).fetchone()
assert result is not None
assert result.track_id == track_id
def test_has_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
assert failure_db.has_match_failure(track_id) is True
def test_remove_match_failure(in_memory_db, mocker):
mocker.patch(
"spotify_to_tidal.cache.sqlalchemy.create_engine", return_value=in_memory_db
)
failure_db = MatchFailureDatabase()
track_id = "test_track"
failure_db.cache_match_failure(track_id)
failure_db.remove_match_failure(track_id)
with failure_db.engine.connect() as connection:
result = connection.execute(
select(failure_db.match_failures).where(
failure_db.match_failures.c.track_id == track_id
)
).fetchone()
assert result is None
# Test TrackMatchCache
def test_track_match_cache_insert():
track_cache = TrackMatchCache()
track_cache.insert(("spotify_id", 123))
assert track_cache.get("spotify_id") == 123
def test_track_match_cache_get():
track_cache = TrackMatchCache()
track_cache.insert(("spotify_id", 123))
assert track_cache.get("spotify_id") == 123
assert track_cache.get("nonexistent_id") is None