13 Commits
1.0.3 ... main

Author SHA1 Message Date
2f762eed4b Standalone App; Works fine but only for MY OWN PLAYLISTS due to spotify dev rights :-( 2026-03-15 20:35:46 +01:00
Tim Rae
8a66f5f864 Fix bug in pyproject.toml: use tilde operator 2026-01-11 11:24:32 +01:00
Tim Rae
299cc3209f Don't specify point version of spotify and tidal APIs
So we don't have to put out a new release to update the dependency
2025-12-31 15:01:30 +01:00
Tim Rae
46771aefc8 Bump version to 1.0.6 2025-12-31 14:59:03 +01:00
Tim Rae
eb745eab6f Update tidalapi version 2025-12-31 14:54:44 +01:00
Alex Camilleri
9ff181ae6e Playlist name in txt output (#140) 2025-10-20 09:36:04 +02:00
Tim Rae
d90c5bac41 Update tidalapi dependency version to 0.8.8 (#136)
Fixes #135 #134
2025-10-16 12:22:15 +02:00
Axel
e62a8a80cb use 127.0.0.1 instead of localhost to meet new spotify URI criteria (#116) 2025-04-09 22:14:15 +02:00
Darkasf
a5afd975f0 Update readme.md (#113) 2025-03-30 06:51:13 +02:00
Johan Lindbergh
03e0396ac0 Add another check to the spotify track sanity filter (#111)
Discovered a crash when syncing a playlist with some random podcast
episode. It crashed because both ['artists'][0]['name'] and
['album']['artists'][0]['name'] was None.

I thought the episode would be caught by the track_filter, but
apparently having the 'type': 'episode' isn't reliable enough :/

Not sure if the check should be in the sanity_filter or any of
the _search_for... methods though.
2025-02-13 22:50:00 +01:00
Tim Rae
a438cda72b Bump version to 1.0.4 2024-12-04 06:09:12 +09:00
Tim Rae
2f1985a42b Check for null playlists in filter 2024-12-04 05:46:30 +09:00
Tim Rae
bcf2bbca0d Add option to do auth manually on headless server (#88) 2024-10-26 11:12:47 +02:00
11 changed files with 603 additions and 19 deletions

1
.gitignore vendored
View File

@@ -3,6 +3,7 @@ config.yml
config.yaml
.cache*
.session.yml
songs not found.txt
# Byte-compiled / optimized / DLL files
__pycache__/

11
Launch Spotify→Tidal.command Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/zsh
# Double-click this file in Finder to launch the Spotify→Tidal sync app.
# Change to the project directory (same folder as this script)
cd "$(dirname "$0")"
# Activate the virtual environment
source .venv/bin/activate
# Launch Streamlit (opens browser automatically)
streamlit run app.py

417
app.py Normal file
View File

@@ -0,0 +1,417 @@
import json
import math
import yaml
import asyncio
import pandas as pd
import streamlit as st
from pathlib import Path
import spotipy
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
# ── Constants ─────────────────────────────────────────────────────────────────
SESSION_FILE = Path("session.json")
NOT_FOUND_FILE = Path("songs not found.txt")
# ── Session persistence ───────────────────────────────────────────────────────
def load_session() -> dict:
if SESSION_FILE.exists():
try:
return json.loads(SESSION_FILE.read_text())
except Exception:
pass
return {"watched": [], "external_playlists": []}
def save_session(data: dict):
SESSION_FILE.write_text(json.dumps(data, indent=2))
def load_watched() -> set:
return set(load_session().get("watched", []))
def save_watched(ids: set):
data = load_session()
data["watched"] = sorted(ids)
save_session(data)
def load_external_playlists() -> list:
return load_session().get("external_playlists", [])
def upsert_external_playlist(info: dict):
data = load_session()
ext = [p for p in data.get("external_playlists", []) if p.get("id") != info["id"]]
ext.append(info)
data["external_playlists"] = ext
save_session(data)
def remove_external_playlist(pid: str):
data = load_session()
data["external_playlists"] = [p for p in data.get("external_playlists", []) if p.get("id") != pid]
save_session(data)
# ── Not-found parser ──────────────────────────────────────────────────────────
def parse_not_found() -> dict:
"""Returns {playlist_name: [(artist, title), ...]}"""
result = {}
if not NOT_FOUND_FILE.exists():
return result
current = None
for raw in NOT_FOUND_FILE.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("=="):
continue
if line.startswith("Playlist:"):
current = line[len("Playlist:"):].strip()
result.setdefault(current, [])
elif current is not None and ": " in line:
_, rest = line.split(": ", 1)
artist, title = (rest.split(" - ", 1) if " - " in rest else ("", rest))
result[current].append((artist.strip(), title.strip()))
return result
# ── Page config ───────────────────────────────────────────────────────────────
st.set_page_config(page_title="Spotify → Tidal Sync", layout="wide")
st.title("🎵 Spotify → Tidal Sync")
# ── Auth ──────────────────────────────────────────────────────────────────────
@st.cache_resource
def get_sessions():
try:
with open("config.yml", "r") as f:
config = yaml.safe_load(f)
except Exception as e:
st.error(f"Failed to load config.yml: {e}"); st.stop()
try:
sp = _auth.open_spotify_session(config["spotify"])
td = _auth.open_tidal_session()
if not td.check_login():
st.error("Could not connect to Tidal."); st.stop()
return config, sp, td
except Exception as e:
st.error(f"Authentication failed: {e}"); st.stop()
config, spotify_session, tidal_session = get_sessions()
# ── Data fetchers ─────────────────────────────────────────────────────────────
@st.cache_data
def fetch_my_playlists():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_sync.get_playlists_from_spotify(spotify_session, config))
loop.close()
return result
@st.cache_data(show_spinner=False)
def fetch_playlist_tracks(pid: str, name: str) -> tuple:
"""Returns (tracks: list, error: str|None). Uses direct API — no sys.exit retry wrapper."""
try:
all_tracks = []
result = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=0)
if result is None:
return [], "Empty response from Spotify."
def _extract(r):
return [
item["track"] for item in (r.get("items") or [])
if item.get("track") and item["track"].get("id")
]
all_tracks.extend(_extract(result))
total = result.get("total", 0)
if result.get("next") and total > 50:
for offset in range(50, total, 50):
page = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=offset)
if page:
all_tracks.extend(_extract(page))
return all_tracks, None
except spotipy.SpotifyException as e:
if e.http_status == 403:
return [], "🔒 Access denied (403) — this playlist may be private or collaborative."
return [], f"Spotify error {e.http_status}: {e.msg}"
except Exception as e:
return [], f"Error loading tracks: {e}"
@st.cache_data(show_spinner=False)
def fetch_single_playlist(pid: str) -> dict | None:
try:
return spotify_session.playlist(pid, fields="id,name,description,tracks.total,owner")
except Exception as e:
print(f"[ERROR] fetch_single_playlist: {e}")
return None
# ── Sync helper ───────────────────────────────────────────────────────────────
def run_sync_playlists(playlists: list):
async def _do():
tidal_map = {p.name: p for p in await get_all_playlists(tidal_session.user)}
total = len(playlists)
bar = st.progress(0, text=f"Starting sync of {total} playlist(s)…")
skipped = []
for i, sp in enumerate(playlists):
bar.progress(i / total, text=f"[{i+1}/{total}] {sp['name']}")
try:
await _sync.sync_playlist(spotify_session, tidal_session, sp, tidal_map.get(sp["name"]), config)
except RuntimeError as e:
skipped.append((sp["name"], str(e)))
print(f"[SKIP] {sp['name']}: {e}")
bar.progress(1.0, text="✅ Done!")
return skipped
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
skipped = loop.run_until_complete(_do())
loop.close()
return skipped or []
# ── Load state ────────────────────────────────────────────────────────────────
with st.spinner("Fetching your Spotify playlists…"):
spotify_playlists = fetch_my_playlists()
if not spotify_playlists:
st.warning("No Spotify playlists found!"); st.stop()
watched_ids = load_watched()
not_found = parse_not_found()
total_nf = sum(len(v) for v in not_found.values())
# ── Tabs ──────────────────────────────────────────────────────────────────────
tab_my, tab_ext, tab_nf = st.tabs([
f"🎵 My Playlists ({len(spotify_playlists)})",
"🔗 External Playlists",
f"❌ Not Found ({total_nf} tracks)",
])
# ════════════════════════════════════════════════════════
# TAB 1 MY PLAYLISTS (expandable rows)
# ════════════════════════════════════════════════════════
with tab_my:
# Collect playlists selected for sync via session_state checkboxes
sync_btn_col, wtch_btn_col, _ = st.columns([2, 2, 4])
# We need to render the list first so state keys exist, then read them
# ── Column headers ────────────────────────────────
hdr = st.columns([6, 1, 1, 1])
hdr[0].markdown("**Playlist**")
hdr[1].markdown("**Tracks**")
hdr[2].markdown("**Watch**")
hdr[3].markdown("**Sync**")
st.divider()
current_watched = load_watched() # fresh from disk
# Sort: watched first, then rest
sorted_playlists = (
[p for p in spotify_playlists if p["id"] in current_watched] +
[p for p in spotify_playlists if p["id"] not in current_watched]
)
showed_watched_header = False
showed_rest_header = False
for p in sorted_playlists:
pid = p["id"]
name = p["name"]
nf_list = not_found.get(name, [])
is_watched = pid in current_watched
tracks_key = f"tracks_{pid}"
# Section headers
if is_watched and not showed_watched_header:
st.markdown("### 👁 Watched Playlists")
showed_watched_header = True
elif not is_watched and not showed_rest_header:
if showed_watched_header:
st.divider()
st.markdown("### 🎵 All Playlists")
showed_rest_header = True
# Use cached tracks from session_state if already loaded
cached_tracks: list | None = st.session_state.get(tracks_key)
track_count = len(cached_tracks) if cached_tracks is not None else None
# Build expander label
count_str = str(track_count) if track_count is not None else ""
prefix = "" if is_watched else ""
warn = " ⚠️" if nf_list else ""
label = f"{prefix}{name}{count_str} tracks{warn}"
with st.expander(label, expanded=False):
left, right = st.columns([3, 1])
with right:
# Watch toggle
new_watch = st.checkbox("👁 Watch", value=is_watched, key=f"watch_{pid}")
if new_watch != is_watched:
if new_watch:
current_watched.add(pid)
else:
current_watched.discard(pid)
save_watched(current_watched)
st.rerun()
# Sync checkbox (persists in session_state automatically)
st.checkbox("▶ Add to sync", key=f"sync_{pid}")
with left:
if cached_tracks is None:
if st.button("🔍 Load tracks", key=f"load_{pid}"):
with st.spinner(f"Loading '{name}'"):
tracks, err = fetch_playlist_tracks(pid, name)
st.session_state[tracks_key] = tracks
if err:
st.session_state[f"tracks_err_{pid}"] = err
st.rerun()
else:
err = st.session_state.get(f"tracks_err_{pid}")
if err:
st.warning(err)
elif cached_tracks:
nf_titles = {title for _, title in nf_list}
rows = [
{
"": "" if t["name"] in nf_titles else "",
"Title": t["name"],
"Artist": ", ".join(a["name"] for a in t.get("artists", [])),
}
for t in cached_tracks
]
st.dataframe(
pd.DataFrame(rows),
hide_index=True,
use_container_width=True,
height=min(40 + len(rows) * 35, 300),
column_config={
"": st.column_config.TextColumn("", width="small"),
"Title": st.column_config.TextColumn("Title"),
"Artist": st.column_config.TextColumn("Artist"),
},
)
if nf_list:
st.caption(f"⚠️ {len(nf_list)} track(s) not found on Tidal")
else:
st.info("No tracks found for this playlist.")
st.divider()
# ── Gather selected playlists & action buttons ────
sel_pls = [p for p in spotify_playlists if st.session_state.get(f"sync_{p['id']}", False)]
wtch_pls = [p for p in spotify_playlists if p["id"] in load_watched()]
with sync_btn_col:
if st.button(f"▶ Sync Selected ({len(sel_pls)})", disabled=not sel_pls, type="primary", width="stretch"):
skipped = run_sync_playlists(sel_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(sel_pls) - len(skipped)
if synced:
st.success(f"{synced} playlist(s) synced!")
st.balloons()
with wtch_btn_col:
if st.button(f"👁 Sync Watched ({len(wtch_pls)})", disabled=not wtch_pls, width="stretch"):
skipped = run_sync_playlists(wtch_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(wtch_pls) - len(skipped)
if synced:
st.success(f"{synced} watched playlist(s) synced!")
st.balloons()
# ════════════════════════════════════════════════════════
# TAB 2 EXTERNAL PLAYLISTS
# ════════════════════════════════════════════════════════
with tab_ext:
st.subheader("Add a friend's playlist")
url_input = st.text_input(
"Spotify URL or ID",
placeholder="https://open.spotify.com/playlist/… or 3ktReNcDeGj94W59UXHU9R",
label_visibility="collapsed",
)
if st.button(" Add Playlist", type="primary"):
pid = url_input.strip()
if "spotify.com/playlist/" in pid:
pid = pid.split("spotify.com/playlist/")[-1].split("?")[0].strip()
if pid:
with st.spinner("Looking up playlist…"):
info = fetch_single_playlist(pid)
if info:
tf = info.get("tracks") or {}
upsert_external_playlist({
"id": info["id"],
"name": info["name"],
"description": info.get("description", ""),
"total_tracks": tf.get("total", 0) if isinstance(tf, dict) else 0,
"owner": (info.get("owner") or {}).get("display_name", ""),
})
st.success(f"✅ Added **{info['name']}**")
st.rerun()
else:
st.error("Playlist not found. Is it public? Check the URL/ID.")
else:
st.warning("Please enter a URL or ID.")
external = load_external_playlists()
if external:
st.subheader(f"Saved ({len(external)})")
ext_rows = [{"▶ Sync": False, "Playlist": p["name"], "Owner": p.get("owner",""), "Tracks": p.get("total_tracks",0), "ID": p["id"]} for p in external]
edit_ext = st.data_editor(
pd.DataFrame(ext_rows),
column_config={
"▶ Sync": st.column_config.CheckboxColumn("▶ Sync", width="small"),
"Playlist": st.column_config.TextColumn("Playlist", disabled=True),
"Owner": st.column_config.TextColumn("Owner", disabled=True, width="medium"),
"Tracks": st.column_config.NumberColumn("Tracks", disabled=True, width="small"),
"ID": st.column_config.TextColumn("Spotify ID", disabled=True, width="medium"),
},
hide_index=True, width="stretch", key="ext_editor",
)
ext_sel = [p for p in external if p["id"] in set(edit_ext[edit_ext["▶ Sync"]]["ID"].tolist())]
if st.button(f"▶ Sync Selected ({len(ext_sel)})", disabled=not ext_sel, type="primary", width="stretch", key="btn_sync_ext"):
with st.spinner("Fetching playlist details…"):
full_pls = [spotify_session.playlist(p["id"]) for p in ext_sel]
skipped = run_sync_playlists(full_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(full_pls) - len(skipped)
if synced:
st.success(f"{synced} external playlist(s) synced!")
st.divider()
rm = st.selectbox("Remove", ["— select —"] + [p["name"] for p in external], key="rm_ext")
if rm != "— select —" and st.button("🗑 Remove", type="secondary"):
rid = next((p["id"] for p in external if p["name"] == rm), None)
if rid:
remove_external_playlist(rid)
fetch_single_playlist.clear()
st.rerun()
else:
st.info("No external playlists added yet.")
# ════════════════════════════════════════════════════════
# TAB 3 NOT FOUND
# ════════════════════════════════════════════════════════
with tab_nf:
if not not_found:
st.success("✅ No unmatched tracks yet — run a sync first.")
else:
st.info(f"**{total_nf}** track(s) across **{len(not_found)}** playlist(s) couldn't be matched on Tidal.")
flt = st.selectbox("Filter by playlist", ["All playlists"] + sorted(not_found.keys()), key="nf_filter")
nf_rows = [
{"Playlist": pl, "Artist": a, "Title": t}
for pl, items in sorted(not_found.items())
if flt in ("All playlists", pl)
for a, t in items
]
if nf_rows:
st.dataframe(pd.DataFrame(nf_rows), hide_index=True, use_container_width=True,
height=min(80 + len(nf_rows) * 35, 500))
else:
st.info("No entries for this filter.")
# ── Liked Songs (opt-in) ──────────────────────────────────────────────────────
with st.expander("⭐ Sync Liked Songs (optional)", expanded=False):
st.write("Sync your Spotify 'Liked Songs' to your Tidal favorites.")
if st.button("Sync Liked Songs Now", type="secondary"):
with st.spinner("Syncing liked songs…"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_sync.sync_favorites(spotify_session, tidal_session, config))
loop.close()
st.success("✅ Liked songs synced!")

View File

@@ -2,7 +2,8 @@ spotify:
client_id: your_client_id
client_secret: your_client_secret
username: your_spotify_username
redirect_uri: http://localhost:8888/callback
redirect_uri: http://127.0.0.1:8888/callback
open_browser: True # Set to False if using a headless server environment
# uncomment this block if you want to only sync specific playlist IDs

113
interactive_sync.py Normal file
View File

@@ -0,0 +1,113 @@
import yaml
import sys
import asyncio
from typing import List, Dict, Any
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
def print_menu(playlists: List[Dict[str, Any]]):
print("\n" + "="*50)
print(f"🎵 Spotify Playlists Found ({len(playlists)} total) 🎵")
print("="*50)
for i, p in enumerate(playlists):
track_count = p.get('tracks', {}).get('total', 'Unknown')
print(f"[{i+1:2d}] {p['name']:<35} ({track_count:>3} tracks) | ID: {p['id']}")
print("="*50)
print("Enter the numbers of the playlists you want to sync, separated by commas (e.g., 1, 4, 12).")
print("Enter 'ALL' to sync all playlists.")
print("Enter 'Q' to quit.")
async def main_async():
# 1. Load config
try:
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
sys.exit(f"Failed to load config.yml: {e}")
# 2. Authenticate
print("Authenticating with Spotify...")
spotify_session = _auth.open_spotify_session(config['spotify'])
print("Authenticating with Tidal...")
tidal_session = _auth.open_tidal_session()
if not tidal_session.check_login():
sys.exit("Could not connect to Tidal")
# 3. Fetch all Spotify user playlists
print("\nFetching Spotify playlists...")
spotify_playlists = await _sync.get_playlists_from_spotify(spotify_session, config)
if not spotify_playlists:
print("No Spotify playlists found!")
return
# 4. Interactive menu loop
while True:
print_menu(spotify_playlists)
choice = input("\nYour choice: ").strip().lower()
if choice == 'q':
print("Exiting...")
return
selected_playlists = []
if choice == 'all':
selected_playlists = spotify_playlists
print(f"Selected ALL {len(selected_playlists)} playlists for synchronization.")
break
else:
try:
# Parse comma-separated indices
indices = [int(x.strip()) - 1 for x in choice.split(',')]
for idx in indices:
if 0 <= idx < len(spotify_playlists):
selected_playlists.append(spotify_playlists[idx])
else:
print(f"Warning: Number {idx + 1} is out of range, ignoring.")
if not selected_playlists:
print("No valid playlists selected. Please try again.")
continue
print(f"Selected {len(selected_playlists)} playlists for synchronization.")
for p in selected_playlists:
print(f" - {p['name']}")
confirm = input("Proceed? (Y/n): ").strip().lower()
if confirm in ('', 'y', 'yes'):
break
except ValueError:
print("Invalid input. Please enter numbers separated by commas (e.g. 1, 2, 3) or 'ALL'.")
# 5. Execute the sync for the selected playlists
if not selected_playlists:
return
print("\n=== Starting Synchronization ===")
tidal_playlists = {p.name: p for p in await get_all_playlists(tidal_session.user)}
mappings_to_sync = []
for sp_playlist in selected_playlists:
mappings_to_sync.append(_sync.pick_tidal_playlist_for_spotify_playlist(sp_playlist, tidal_playlists))
for index, (sp_playlist, td_playlist) in enumerate(mappings_to_sync):
print(f"\n[{index+1}/{len(mappings_to_sync)}] Syncing playlist: {sp_playlist['name']}")
await _sync.sync_playlist(spotify_session, tidal_session, sp_playlist, td_playlist, config)
print("\n=== Synchronization Complete ===")
# Optionally sync favorites if enabled in config
if config.get('sync_favorites_default', True):
fav_confirm = input("\nWould you like to sync your 'Liked Songs' as well? (Y/n): ").strip().lower()
if fav_confirm in ('', 'y', 'yes'):
await _sync.sync_favorites(spotify_session, tidal_session, config)
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()

View File

@@ -4,12 +4,12 @@ build-backend = "setuptools.build_meta"
[project]
name = "spotify_to_tidal"
version = "1.0.3"
version = "1.0.7"
requires-python = ">= 3.10"
dependencies = [
"spotipy~=2.24.0",
"tidalapi==0.7.6",
"spotipy~=2.24",
"tidalapi~=0.8.10",
"pyyaml~=6.0",
"tqdm~=4.64",
"sqlalchemy~=2.0",

View File

@@ -19,6 +19,7 @@ Setup
Usage
----
To synchronize all of your Spotify playlists with your Tidal account run the following from the project root directory
Windows ignores python module paths by default, but you can run them using `python3 -m spotify_to_tidal`
```bash
spotify_to_tidal

28
session.json Normal file
View File

@@ -0,0 +1,28 @@
{
"watched": [
"2sLEYLo3a6dDHbl4ypUkcl",
"3Sjb6ZlsPxIpzOMZVoq1pW",
"3eNcChQ2KssR8QjuV7KUEc",
"44Shnwiw8xk3UmKf7Unp6o",
"4ISU1ED9Vx8SH8o3JQVttw",
"4iVWLq7tdRfLzTbKsYhM56",
"5d49VR3snvfwyAJxJgB20s",
"7KG4kWpAQMhJxudcO42YkG"
],
"external_playlists": [
{
"id": "3ysFr4SgWB6IfzKgTij9Kj",
"name": "Down Down Down",
"description": "",
"total_tracks": 0,
"owner": "Lino Toran Jenner"
},
{
"id": "4YqSwVf1etm2ILxOwifQyv",
"name": "SunSet [Suppe]",
"description": "",
"total_tracks": 0,
"owner": "Lino Toran Jenner"
}
]
}

View File

@@ -11,7 +11,7 @@ __all__ = [
'open_tidal_session'
]
SPOTIFY_SCOPES = 'playlist-read-private, user-library-read'
SPOTIFY_SCOPES = 'playlist-read-private playlist-read-collaborative user-library-read'
def open_spotify_session(config) -> spotipy.Spotify:
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
@@ -19,7 +19,8 @@ def open_spotify_session(config) -> spotipy.Spotify:
client_id=config['client_id'],
client_secret=config['client_secret'],
redirect_uri=config['redirect_uri'],
requests_timeout=2)
requests_timeout=2,
open_browser=config.get('open_browser', True))
try:
credentials_manager.get_access_token(as_dict=False)
except spotipy.SpotifyOauthError:

View File

@@ -139,6 +139,10 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
try:
return await function(*args, **kwargs)
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e:
# 403 Forbidden: retrying will never help — fail fast to avoid hanging
if isinstance(e, spotipy.exceptions.SpotifyException) and e.http_status == 403:
raise RuntimeError(f"Access denied (403) for playlist — it may be private or collaborative: {e}") from e
if remaining:
print(f"{str(e)} occurred, retrying {remaining} times")
else:
@@ -161,7 +165,7 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
output = []
results = fetch_function(0)
output.extend([item['track'] for item in results['items'] if item['track'] is not None])
output.extend([item.get('track') or item.get('item') for item in results['items'] if item.get('track') or item.get('item')])
# Get all the remaining tracks in parallel
if results['next']:
@@ -171,20 +175,23 @@ async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[di
desc="Fetching additional data chunks"
)
for extra_result in extra_results:
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])
output.extend([item.get('track') or item.get('item') for item in extra_result['items'] if item.get('track') or item.get('item')])
return output
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type"
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
return spotify_session.playlist_tracks(playlist_id=playlist_id, offset=offset)
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
track_filter = lambda item: item.get('type', 'track') == 'track' # type may be 'episode' also
sanity_filter = lambda item: 'album' in item and 'name' in item['album'] and 'artists' in item['album'] and len(item['album']['artists']) > 0
sanity_filter = lambda item: ('album' in item
and 'name' in item['album']
and 'artists' in item['album']
and len(item['album']['artists']) > 0
and item['album']['artists'][0]['name'] is not None)
return list(filter(sanity_filter, filter(track_filter, items)))
def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]):
@@ -277,7 +284,9 @@ async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tr
color = ('\033[91m', '\033[0m')
print(color[0] + "Could not find the track " + song404[-1] + color[1])
file_name = "songs not found.txt"
header = f"==========================\nPlaylist: {playlist_name}\n==========================\n"
with open(file_name, "a", encoding="utf-8") as file:
file.write(header)
for song in song404:
file.write(f"{song}\n")
@@ -387,7 +396,7 @@ async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config):
playlists.extend([p for p in extra_result['items']])
# filter out playlists that don't belong to us or are on the exclude list
my_playlist_filter = lambda p: p['owner']['id'] == user_id
my_playlist_filter = lambda p: p and p['owner']['id'] == user_id
exclude_filter = lambda p: not p['id'] in exclude_list
return list(filter( exclude_filter, filter( my_playlist_filter, playlists )))

View File

@@ -23,7 +23,8 @@ def test_open_spotify_session(mocker):
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://localhost/",
"redirect_uri": "http://127.0.0.1/",
"open_browser": True,
}
# Create a mock SpotifyOAuth instance
@@ -39,8 +40,9 @@ def test_open_spotify_session(mocker):
scope=SPOTIFY_SCOPES,
client_id="test_client_id",
client_secret="test_client_secret",
redirect_uri="http://localhost/",
redirect_uri="http://127.0.0.1/",
requests_timeout=2,
open_browser=True,
)
# Assert that the Spotify instance was created
@@ -62,7 +64,7 @@ def test_open_spotify_session_oauth_error(mocker):
"username": "test_user",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"redirect_uri": "http://localhost/",
"redirect_uri": "http://127.0.0.1/",
}
# Mock sys.exit to prevent the test from exiting