import json import math import yaml import asyncio import pandas as pd import streamlit as st from pathlib import Path import spotipy from spotify_to_tidal import auth as _auth from spotify_to_tidal import sync as _sync from spotify_to_tidal.tidalapi_patch import get_all_playlists # ── Constants ───────────────────────────────────────────────────────────────── SESSION_FILE = Path("session.json") NOT_FOUND_FILE = Path("songs not found.txt") # ── Session persistence ─────────────────────────────────────────────────────── def load_session() -> dict: if SESSION_FILE.exists(): try: return json.loads(SESSION_FILE.read_text()) except Exception: pass return {"watched": [], "external_playlists": []} def save_session(data: dict): SESSION_FILE.write_text(json.dumps(data, indent=2)) def load_watched() -> set: return set(load_session().get("watched", [])) def save_watched(ids: set): data = load_session() data["watched"] = sorted(ids) save_session(data) def load_external_playlists() -> list: return load_session().get("external_playlists", []) def upsert_external_playlist(info: dict): data = load_session() ext = [p for p in data.get("external_playlists", []) if p.get("id") != info["id"]] ext.append(info) data["external_playlists"] = ext save_session(data) def remove_external_playlist(pid: str): data = load_session() data["external_playlists"] = [p for p in data.get("external_playlists", []) if p.get("id") != pid] save_session(data) # ── Not-found parser ────────────────────────────────────────────────────────── def parse_not_found() -> dict: """Returns {playlist_name: [(artist, title), ...]}""" result = {} if not NOT_FOUND_FILE.exists(): return result current = None for raw in NOT_FOUND_FILE.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line or line.startswith("=="): continue if line.startswith("Playlist:"): current = line[len("Playlist:"):].strip() result.setdefault(current, []) elif current is not None and ": " in line: _, rest = line.split(": ", 1) artist, title = (rest.split(" - ", 1) if " - " in rest else ("", rest)) result[current].append((artist.strip(), title.strip())) return result # ── Page config ─────────────────────────────────────────────────────────────── st.set_page_config(page_title="Spotify → Tidal Sync", layout="wide") st.title("🎵 Spotify → Tidal Sync") # ── Auth ────────────────────────────────────────────────────────────────────── @st.cache_resource def get_sessions(): try: with open("config.yml", "r") as f: config = yaml.safe_load(f) except Exception as e: st.error(f"Failed to load config.yml: {e}"); st.stop() try: sp = _auth.open_spotify_session(config["spotify"]) td = _auth.open_tidal_session() if not td.check_login(): st.error("Could not connect to Tidal."); st.stop() return config, sp, td except Exception as e: st.error(f"Authentication failed: {e}"); st.stop() config, spotify_session, tidal_session = get_sessions() # ── Data fetchers ───────────────────────────────────────────────────────────── @st.cache_data def fetch_my_playlists(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(_sync.get_playlists_from_spotify(spotify_session, config)) loop.close() return result @st.cache_data(show_spinner=False) def fetch_playlist_tracks(pid: str, name: str) -> tuple: """Returns (tracks: list, error: str|None). Uses direct API — no sys.exit retry wrapper.""" try: all_tracks = [] result = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=0) if result is None: return [], "Empty response from Spotify." def _extract(r): return [ item["track"] for item in (r.get("items") or []) if item.get("track") and item["track"].get("id") ] all_tracks.extend(_extract(result)) total = result.get("total", 0) if result.get("next") and total > 50: for offset in range(50, total, 50): page = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=offset) if page: all_tracks.extend(_extract(page)) return all_tracks, None except spotipy.SpotifyException as e: if e.http_status == 403: return [], "🔒 Access denied (403) — this playlist may be private or collaborative." return [], f"Spotify error {e.http_status}: {e.msg}" except Exception as e: return [], f"Error loading tracks: {e}" @st.cache_data(show_spinner=False) def fetch_single_playlist(pid: str) -> dict | None: try: return spotify_session.playlist(pid, fields="id,name,description,tracks.total,owner") except Exception as e: print(f"[ERROR] fetch_single_playlist: {e}") return None # ── Sync helper ─────────────────────────────────────────────────────────────── def run_sync_playlists(playlists: list): async def _do(): tidal_map = {p.name: p for p in await get_all_playlists(tidal_session.user)} total = len(playlists) bar = st.progress(0, text=f"Starting sync of {total} playlist(s)…") skipped = [] for i, sp in enumerate(playlists): bar.progress(i / total, text=f"[{i+1}/{total}] {sp['name']}") try: await _sync.sync_playlist(spotify_session, tidal_session, sp, tidal_map.get(sp["name"]), config) except RuntimeError as e: skipped.append((sp["name"], str(e))) print(f"[SKIP] {sp['name']}: {e}") bar.progress(1.0, text="✅ Done!") return skipped loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) skipped = loop.run_until_complete(_do()) loop.close() return skipped or [] # ── Load state ──────────────────────────────────────────────────────────────── with st.spinner("Fetching your Spotify playlists…"): spotify_playlists = fetch_my_playlists() if not spotify_playlists: st.warning("No Spotify playlists found!"); st.stop() watched_ids = load_watched() not_found = parse_not_found() total_nf = sum(len(v) for v in not_found.values()) # ── Tabs ────────────────────────────────────────────────────────────────────── tab_my, tab_ext, tab_nf = st.tabs([ f"🎵 My Playlists ({len(spotify_playlists)})", "🔗 External Playlists", f"❌ Not Found ({total_nf} tracks)", ]) # ════════════════════════════════════════════════════════ # TAB 1 – MY PLAYLISTS (expandable rows) # ════════════════════════════════════════════════════════ with tab_my: # Collect playlists selected for sync via session_state checkboxes sync_btn_col, wtch_btn_col, _ = st.columns([2, 2, 4]) # We need to render the list first so state keys exist, then read them # ── Column headers ──────────────────────────────── hdr = st.columns([6, 1, 1, 1]) hdr[0].markdown("**Playlist**") hdr[1].markdown("**Tracks**") hdr[2].markdown("**Watch**") hdr[3].markdown("**Sync**") st.divider() current_watched = load_watched() # fresh from disk # Sort: watched first, then rest sorted_playlists = ( [p for p in spotify_playlists if p["id"] in current_watched] + [p for p in spotify_playlists if p["id"] not in current_watched] ) showed_watched_header = False showed_rest_header = False for p in sorted_playlists: pid = p["id"] name = p["name"] nf_list = not_found.get(name, []) is_watched = pid in current_watched tracks_key = f"tracks_{pid}" # Section headers if is_watched and not showed_watched_header: st.markdown("### 👁 Watched Playlists") showed_watched_header = True elif not is_watched and not showed_rest_header: if showed_watched_header: st.divider() st.markdown("### 🎵 All Playlists") showed_rest_header = True # Use cached tracks from session_state if already loaded cached_tracks: list | None = st.session_state.get(tracks_key) track_count = len(cached_tracks) if cached_tracks is not None else None # Build expander label count_str = str(track_count) if track_count is not None else "…" prefix = "★ " if is_watched else "" warn = " ⚠️" if nf_list else "" label = f"{prefix}{name} — {count_str} tracks{warn}" with st.expander(label, expanded=False): left, right = st.columns([3, 1]) with right: # Watch toggle new_watch = st.checkbox("👁 Watch", value=is_watched, key=f"watch_{pid}") if new_watch != is_watched: if new_watch: current_watched.add(pid) else: current_watched.discard(pid) save_watched(current_watched) st.rerun() # Sync checkbox (persists in session_state automatically) st.checkbox("▶ Add to sync", key=f"sync_{pid}") with left: if cached_tracks is None: if st.button("🔍 Load tracks", key=f"load_{pid}"): with st.spinner(f"Loading '{name}'…"): tracks, err = fetch_playlist_tracks(pid, name) st.session_state[tracks_key] = tracks if err: st.session_state[f"tracks_err_{pid}"] = err st.rerun() else: err = st.session_state.get(f"tracks_err_{pid}") if err: st.warning(err) elif cached_tracks: nf_titles = {title for _, title in nf_list} rows = [ { "": "❌" if t["name"] in nf_titles else "✅", "Title": t["name"], "Artist": ", ".join(a["name"] for a in t.get("artists", [])), } for t in cached_tracks ] st.dataframe( pd.DataFrame(rows), hide_index=True, use_container_width=True, height=min(40 + len(rows) * 35, 300), column_config={ "": st.column_config.TextColumn("", width="small"), "Title": st.column_config.TextColumn("Title"), "Artist": st.column_config.TextColumn("Artist"), }, ) if nf_list: st.caption(f"⚠️ {len(nf_list)} track(s) not found on Tidal") else: st.info("No tracks found for this playlist.") st.divider() # ── Gather selected playlists & action buttons ──── sel_pls = [p for p in spotify_playlists if st.session_state.get(f"sync_{p['id']}", False)] wtch_pls = [p for p in spotify_playlists if p["id"] in load_watched()] with sync_btn_col: if st.button(f"▶ Sync Selected ({len(sel_pls)})", disabled=not sel_pls, type="primary", width="stretch"): skipped = run_sync_playlists(sel_pls) for name, err in skipped: st.warning(f"⚠️ Skipped **{name}**: {err}") synced = len(sel_pls) - len(skipped) if synced: st.success(f"✅ {synced} playlist(s) synced!") st.balloons() with wtch_btn_col: if st.button(f"👁 Sync Watched ({len(wtch_pls)})", disabled=not wtch_pls, width="stretch"): skipped = run_sync_playlists(wtch_pls) for name, err in skipped: st.warning(f"⚠️ Skipped **{name}**: {err}") synced = len(wtch_pls) - len(skipped) if synced: st.success(f"✅ {synced} watched playlist(s) synced!") st.balloons() # ════════════════════════════════════════════════════════ # TAB 2 – EXTERNAL PLAYLISTS # ════════════════════════════════════════════════════════ with tab_ext: st.subheader("Add a friend's playlist") url_input = st.text_input( "Spotify URL or ID", placeholder="https://open.spotify.com/playlist/… or 3ktReNcDeGj94W59UXHU9R", label_visibility="collapsed", ) if st.button("➕ Add Playlist", type="primary"): pid = url_input.strip() if "spotify.com/playlist/" in pid: pid = pid.split("spotify.com/playlist/")[-1].split("?")[0].strip() if pid: with st.spinner("Looking up playlist…"): info = fetch_single_playlist(pid) if info: tf = info.get("tracks") or {} upsert_external_playlist({ "id": info["id"], "name": info["name"], "description": info.get("description", ""), "total_tracks": tf.get("total", 0) if isinstance(tf, dict) else 0, "owner": (info.get("owner") or {}).get("display_name", ""), }) st.success(f"✅ Added **{info['name']}**") st.rerun() else: st.error("Playlist not found. Is it public? Check the URL/ID.") else: st.warning("Please enter a URL or ID.") external = load_external_playlists() if external: st.subheader(f"Saved ({len(external)})") ext_rows = [{"▶ Sync": False, "Playlist": p["name"], "Owner": p.get("owner",""), "Tracks": p.get("total_tracks",0), "ID": p["id"]} for p in external] edit_ext = st.data_editor( pd.DataFrame(ext_rows), column_config={ "▶ Sync": st.column_config.CheckboxColumn("▶ Sync", width="small"), "Playlist": st.column_config.TextColumn("Playlist", disabled=True), "Owner": st.column_config.TextColumn("Owner", disabled=True, width="medium"), "Tracks": st.column_config.NumberColumn("Tracks", disabled=True, width="small"), "ID": st.column_config.TextColumn("Spotify ID", disabled=True, width="medium"), }, hide_index=True, width="stretch", key="ext_editor", ) ext_sel = [p for p in external if p["id"] in set(edit_ext[edit_ext["▶ Sync"]]["ID"].tolist())] if st.button(f"▶ Sync Selected ({len(ext_sel)})", disabled=not ext_sel, type="primary", width="stretch", key="btn_sync_ext"): with st.spinner("Fetching playlist details…"): full_pls = [spotify_session.playlist(p["id"]) for p in ext_sel] skipped = run_sync_playlists(full_pls) for name, err in skipped: st.warning(f"⚠️ Skipped **{name}**: {err}") synced = len(full_pls) - len(skipped) if synced: st.success(f"✅ {synced} external playlist(s) synced!") st.divider() rm = st.selectbox("Remove", ["— select —"] + [p["name"] for p in external], key="rm_ext") if rm != "— select —" and st.button("🗑 Remove", type="secondary"): rid = next((p["id"] for p in external if p["name"] == rm), None) if rid: remove_external_playlist(rid) fetch_single_playlist.clear() st.rerun() else: st.info("No external playlists added yet.") # ════════════════════════════════════════════════════════ # TAB 3 – NOT FOUND # ════════════════════════════════════════════════════════ with tab_nf: if not not_found: st.success("✅ No unmatched tracks yet — run a sync first.") else: st.info(f"**{total_nf}** track(s) across **{len(not_found)}** playlist(s) couldn't be matched on Tidal.") flt = st.selectbox("Filter by playlist", ["All playlists"] + sorted(not_found.keys()), key="nf_filter") nf_rows = [ {"Playlist": pl, "Artist": a, "Title": t} for pl, items in sorted(not_found.items()) if flt in ("All playlists", pl) for a, t in items ] if nf_rows: st.dataframe(pd.DataFrame(nf_rows), hide_index=True, use_container_width=True, height=min(80 + len(nf_rows) * 35, 500)) else: st.info("No entries for this filter.") # ── Liked Songs (opt-in) ────────────────────────────────────────────────────── with st.expander("⭐ Sync Liked Songs (optional)", expanded=False): st.write("Sync your Spotify 'Liked Songs' to your Tidal favorites.") if st.button("Sync Liked Songs Now", type="secondary"): with st.spinner("Syncing liked songs…"): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(_sync.sync_favorites(spotify_session, tidal_session, config)) loop.close() st.success("✅ Liked songs synced!")