Files
spotify-to-tidal/app.py

418 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import math
import yaml
import asyncio
import pandas as pd
import streamlit as st
from pathlib import Path
import spotipy
from spotify_to_tidal import auth as _auth
from spotify_to_tidal import sync as _sync
from spotify_to_tidal.tidalapi_patch import get_all_playlists
# ── Constants ─────────────────────────────────────────────────────────────────
SESSION_FILE = Path("session.json")
NOT_FOUND_FILE = Path("songs not found.txt")
# ── Session persistence ───────────────────────────────────────────────────────
def load_session() -> dict:
if SESSION_FILE.exists():
try:
return json.loads(SESSION_FILE.read_text())
except Exception:
pass
return {"watched": [], "external_playlists": []}
def save_session(data: dict):
SESSION_FILE.write_text(json.dumps(data, indent=2))
def load_watched() -> set:
return set(load_session().get("watched", []))
def save_watched(ids: set):
data = load_session()
data["watched"] = sorted(ids)
save_session(data)
def load_external_playlists() -> list:
return load_session().get("external_playlists", [])
def upsert_external_playlist(info: dict):
data = load_session()
ext = [p for p in data.get("external_playlists", []) if p.get("id") != info["id"]]
ext.append(info)
data["external_playlists"] = ext
save_session(data)
def remove_external_playlist(pid: str):
data = load_session()
data["external_playlists"] = [p for p in data.get("external_playlists", []) if p.get("id") != pid]
save_session(data)
# ── Not-found parser ──────────────────────────────────────────────────────────
def parse_not_found() -> dict:
"""Returns {playlist_name: [(artist, title), ...]}"""
result = {}
if not NOT_FOUND_FILE.exists():
return result
current = None
for raw in NOT_FOUND_FILE.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("=="):
continue
if line.startswith("Playlist:"):
current = line[len("Playlist:"):].strip()
result.setdefault(current, [])
elif current is not None and ": " in line:
_, rest = line.split(": ", 1)
artist, title = (rest.split(" - ", 1) if " - " in rest else ("", rest))
result[current].append((artist.strip(), title.strip()))
return result
# ── Page config ───────────────────────────────────────────────────────────────
st.set_page_config(page_title="Spotify → Tidal Sync", layout="wide")
st.title("🎵 Spotify → Tidal Sync")
# ── Auth ──────────────────────────────────────────────────────────────────────
@st.cache_resource
def get_sessions():
try:
with open("config.yml", "r") as f:
config = yaml.safe_load(f)
except Exception as e:
st.error(f"Failed to load config.yml: {e}"); st.stop()
try:
sp = _auth.open_spotify_session(config["spotify"])
td = _auth.open_tidal_session()
if not td.check_login():
st.error("Could not connect to Tidal."); st.stop()
return config, sp, td
except Exception as e:
st.error(f"Authentication failed: {e}"); st.stop()
config, spotify_session, tidal_session = get_sessions()
# ── Data fetchers ─────────────────────────────────────────────────────────────
@st.cache_data
def fetch_my_playlists():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_sync.get_playlists_from_spotify(spotify_session, config))
loop.close()
return result
@st.cache_data(show_spinner=False)
def fetch_playlist_tracks(pid: str, name: str) -> tuple:
"""Returns (tracks: list, error: str|None). Uses direct API — no sys.exit retry wrapper."""
try:
all_tracks = []
result = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=0)
if result is None:
return [], "Empty response from Spotify."
def _extract(r):
return [
item["track"] for item in (r.get("items") or [])
if item.get("track") and item["track"].get("id")
]
all_tracks.extend(_extract(result))
total = result.get("total", 0)
if result.get("next") and total > 50:
for offset in range(50, total, 50):
page = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=offset)
if page:
all_tracks.extend(_extract(page))
return all_tracks, None
except spotipy.SpotifyException as e:
if e.http_status == 403:
return [], "🔒 Access denied (403) — this playlist may be private or collaborative."
return [], f"Spotify error {e.http_status}: {e.msg}"
except Exception as e:
return [], f"Error loading tracks: {e}"
@st.cache_data(show_spinner=False)
def fetch_single_playlist(pid: str) -> dict | None:
try:
return spotify_session.playlist(pid, fields="id,name,description,tracks.total,owner")
except Exception as e:
print(f"[ERROR] fetch_single_playlist: {e}")
return None
# ── Sync helper ───────────────────────────────────────────────────────────────
def run_sync_playlists(playlists: list):
async def _do():
tidal_map = {p.name: p for p in await get_all_playlists(tidal_session.user)}
total = len(playlists)
bar = st.progress(0, text=f"Starting sync of {total} playlist(s)…")
skipped = []
for i, sp in enumerate(playlists):
bar.progress(i / total, text=f"[{i+1}/{total}] {sp['name']}")
try:
await _sync.sync_playlist(spotify_session, tidal_session, sp, tidal_map.get(sp["name"]), config)
except RuntimeError as e:
skipped.append((sp["name"], str(e)))
print(f"[SKIP] {sp['name']}: {e}")
bar.progress(1.0, text="✅ Done!")
return skipped
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
skipped = loop.run_until_complete(_do())
loop.close()
return skipped or []
# ── Load state ────────────────────────────────────────────────────────────────
with st.spinner("Fetching your Spotify playlists…"):
spotify_playlists = fetch_my_playlists()
if not spotify_playlists:
st.warning("No Spotify playlists found!"); st.stop()
watched_ids = load_watched()
not_found = parse_not_found()
total_nf = sum(len(v) for v in not_found.values())
# ── Tabs ──────────────────────────────────────────────────────────────────────
tab_my, tab_ext, tab_nf = st.tabs([
f"🎵 My Playlists ({len(spotify_playlists)})",
"🔗 External Playlists",
f"❌ Not Found ({total_nf} tracks)",
])
# ════════════════════════════════════════════════════════
# TAB 1 MY PLAYLISTS (expandable rows)
# ════════════════════════════════════════════════════════
with tab_my:
# Collect playlists selected for sync via session_state checkboxes
sync_btn_col, wtch_btn_col, _ = st.columns([2, 2, 4])
# We need to render the list first so state keys exist, then read them
# ── Column headers ────────────────────────────────
hdr = st.columns([6, 1, 1, 1])
hdr[0].markdown("**Playlist**")
hdr[1].markdown("**Tracks**")
hdr[2].markdown("**Watch**")
hdr[3].markdown("**Sync**")
st.divider()
current_watched = load_watched() # fresh from disk
# Sort: watched first, then rest
sorted_playlists = (
[p for p in spotify_playlists if p["id"] in current_watched] +
[p for p in spotify_playlists if p["id"] not in current_watched]
)
showed_watched_header = False
showed_rest_header = False
for p in sorted_playlists:
pid = p["id"]
name = p["name"]
nf_list = not_found.get(name, [])
is_watched = pid in current_watched
tracks_key = f"tracks_{pid}"
# Section headers
if is_watched and not showed_watched_header:
st.markdown("### 👁 Watched Playlists")
showed_watched_header = True
elif not is_watched and not showed_rest_header:
if showed_watched_header:
st.divider()
st.markdown("### 🎵 All Playlists")
showed_rest_header = True
# Use cached tracks from session_state if already loaded
cached_tracks: list | None = st.session_state.get(tracks_key)
track_count = len(cached_tracks) if cached_tracks is not None else None
# Build expander label
count_str = str(track_count) if track_count is not None else ""
prefix = "" if is_watched else ""
warn = " ⚠️" if nf_list else ""
label = f"{prefix}{name}{count_str} tracks{warn}"
with st.expander(label, expanded=False):
left, right = st.columns([3, 1])
with right:
# Watch toggle
new_watch = st.checkbox("👁 Watch", value=is_watched, key=f"watch_{pid}")
if new_watch != is_watched:
if new_watch:
current_watched.add(pid)
else:
current_watched.discard(pid)
save_watched(current_watched)
st.rerun()
# Sync checkbox (persists in session_state automatically)
st.checkbox("▶ Add to sync", key=f"sync_{pid}")
with left:
if cached_tracks is None:
if st.button("🔍 Load tracks", key=f"load_{pid}"):
with st.spinner(f"Loading '{name}'"):
tracks, err = fetch_playlist_tracks(pid, name)
st.session_state[tracks_key] = tracks
if err:
st.session_state[f"tracks_err_{pid}"] = err
st.rerun()
else:
err = st.session_state.get(f"tracks_err_{pid}")
if err:
st.warning(err)
elif cached_tracks:
nf_titles = {title for _, title in nf_list}
rows = [
{
"": "" if t["name"] in nf_titles else "",
"Title": t["name"],
"Artist": ", ".join(a["name"] for a in t.get("artists", [])),
}
for t in cached_tracks
]
st.dataframe(
pd.DataFrame(rows),
hide_index=True,
use_container_width=True,
height=min(40 + len(rows) * 35, 300),
column_config={
"": st.column_config.TextColumn("", width="small"),
"Title": st.column_config.TextColumn("Title"),
"Artist": st.column_config.TextColumn("Artist"),
},
)
if nf_list:
st.caption(f"⚠️ {len(nf_list)} track(s) not found on Tidal")
else:
st.info("No tracks found for this playlist.")
st.divider()
# ── Gather selected playlists & action buttons ────
sel_pls = [p for p in spotify_playlists if st.session_state.get(f"sync_{p['id']}", False)]
wtch_pls = [p for p in spotify_playlists if p["id"] in load_watched()]
with sync_btn_col:
if st.button(f"▶ Sync Selected ({len(sel_pls)})", disabled=not sel_pls, type="primary", width="stretch"):
skipped = run_sync_playlists(sel_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(sel_pls) - len(skipped)
if synced:
st.success(f"{synced} playlist(s) synced!")
st.balloons()
with wtch_btn_col:
if st.button(f"👁 Sync Watched ({len(wtch_pls)})", disabled=not wtch_pls, width="stretch"):
skipped = run_sync_playlists(wtch_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(wtch_pls) - len(skipped)
if synced:
st.success(f"{synced} watched playlist(s) synced!")
st.balloons()
# ════════════════════════════════════════════════════════
# TAB 2 EXTERNAL PLAYLISTS
# ════════════════════════════════════════════════════════
with tab_ext:
st.subheader("Add a friend's playlist")
url_input = st.text_input(
"Spotify URL or ID",
placeholder="https://open.spotify.com/playlist/… or 3ktReNcDeGj94W59UXHU9R",
label_visibility="collapsed",
)
if st.button(" Add Playlist", type="primary"):
pid = url_input.strip()
if "spotify.com/playlist/" in pid:
pid = pid.split("spotify.com/playlist/")[-1].split("?")[0].strip()
if pid:
with st.spinner("Looking up playlist…"):
info = fetch_single_playlist(pid)
if info:
tf = info.get("tracks") or {}
upsert_external_playlist({
"id": info["id"],
"name": info["name"],
"description": info.get("description", ""),
"total_tracks": tf.get("total", 0) if isinstance(tf, dict) else 0,
"owner": (info.get("owner") or {}).get("display_name", ""),
})
st.success(f"✅ Added **{info['name']}**")
st.rerun()
else:
st.error("Playlist not found. Is it public? Check the URL/ID.")
else:
st.warning("Please enter a URL or ID.")
external = load_external_playlists()
if external:
st.subheader(f"Saved ({len(external)})")
ext_rows = [{"▶ Sync": False, "Playlist": p["name"], "Owner": p.get("owner",""), "Tracks": p.get("total_tracks",0), "ID": p["id"]} for p in external]
edit_ext = st.data_editor(
pd.DataFrame(ext_rows),
column_config={
"▶ Sync": st.column_config.CheckboxColumn("▶ Sync", width="small"),
"Playlist": st.column_config.TextColumn("Playlist", disabled=True),
"Owner": st.column_config.TextColumn("Owner", disabled=True, width="medium"),
"Tracks": st.column_config.NumberColumn("Tracks", disabled=True, width="small"),
"ID": st.column_config.TextColumn("Spotify ID", disabled=True, width="medium"),
},
hide_index=True, width="stretch", key="ext_editor",
)
ext_sel = [p for p in external if p["id"] in set(edit_ext[edit_ext["▶ Sync"]]["ID"].tolist())]
if st.button(f"▶ Sync Selected ({len(ext_sel)})", disabled=not ext_sel, type="primary", width="stretch", key="btn_sync_ext"):
with st.spinner("Fetching playlist details…"):
full_pls = [spotify_session.playlist(p["id"]) for p in ext_sel]
skipped = run_sync_playlists(full_pls)
for name, err in skipped:
st.warning(f"⚠️ Skipped **{name}**: {err}")
synced = len(full_pls) - len(skipped)
if synced:
st.success(f"{synced} external playlist(s) synced!")
st.divider()
rm = st.selectbox("Remove", ["— select —"] + [p["name"] for p in external], key="rm_ext")
if rm != "— select —" and st.button("🗑 Remove", type="secondary"):
rid = next((p["id"] for p in external if p["name"] == rm), None)
if rid:
remove_external_playlist(rid)
fetch_single_playlist.clear()
st.rerun()
else:
st.info("No external playlists added yet.")
# ════════════════════════════════════════════════════════
# TAB 3 NOT FOUND
# ════════════════════════════════════════════════════════
with tab_nf:
if not not_found:
st.success("✅ No unmatched tracks yet — run a sync first.")
else:
st.info(f"**{total_nf}** track(s) across **{len(not_found)}** playlist(s) couldn't be matched on Tidal.")
flt = st.selectbox("Filter by playlist", ["All playlists"] + sorted(not_found.keys()), key="nf_filter")
nf_rows = [
{"Playlist": pl, "Artist": a, "Title": t}
for pl, items in sorted(not_found.items())
if flt in ("All playlists", pl)
for a, t in items
]
if nf_rows:
st.dataframe(pd.DataFrame(nf_rows), hide_index=True, use_container_width=True,
height=min(80 + len(nf_rows) * 35, 500))
else:
st.info("No entries for this filter.")
# ── Liked Songs (opt-in) ──────────────────────────────────────────────────────
with st.expander("⭐ Sync Liked Songs (optional)", expanded=False):
st.write("Sync your Spotify 'Liked Songs' to your Tidal favorites.")
if st.button("Sync Liked Songs Now", type="secondary"):
with st.spinner("Syncing liked songs…"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_sync.sync_favorites(spotify_session, tidal_session, config))
loop.close()
st.success("✅ Liked songs synced!")