Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2f762eed4b | |||
|
|
8a66f5f864 |
11
Launch Spotify→Tidal.command
Executable file
11
Launch Spotify→Tidal.command
Executable file
@@ -0,0 +1,11 @@
|
||||
#!/bin/zsh
|
||||
# Double-click this file in Finder to launch the Spotify→Tidal sync app.
|
||||
|
||||
# Change to the project directory (same folder as this script)
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
# Activate the virtual environment
|
||||
source .venv/bin/activate
|
||||
|
||||
# Launch Streamlit (opens browser automatically)
|
||||
streamlit run app.py
|
||||
417
app.py
Normal file
417
app.py
Normal file
@@ -0,0 +1,417 @@
|
||||
import json
|
||||
import math
|
||||
import yaml
|
||||
import asyncio
|
||||
import pandas as pd
|
||||
import streamlit as st
|
||||
from pathlib import Path
|
||||
import spotipy
|
||||
|
||||
from spotify_to_tidal import auth as _auth
|
||||
from spotify_to_tidal import sync as _sync
|
||||
from spotify_to_tidal.tidalapi_patch import get_all_playlists
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||
SESSION_FILE = Path("session.json")
|
||||
NOT_FOUND_FILE = Path("songs not found.txt")
|
||||
|
||||
# ── Session persistence ───────────────────────────────────────────────────────
|
||||
def load_session() -> dict:
|
||||
if SESSION_FILE.exists():
|
||||
try:
|
||||
return json.loads(SESSION_FILE.read_text())
|
||||
except Exception:
|
||||
pass
|
||||
return {"watched": [], "external_playlists": []}
|
||||
|
||||
def save_session(data: dict):
|
||||
SESSION_FILE.write_text(json.dumps(data, indent=2))
|
||||
|
||||
def load_watched() -> set:
|
||||
return set(load_session().get("watched", []))
|
||||
|
||||
def save_watched(ids: set):
|
||||
data = load_session()
|
||||
data["watched"] = sorted(ids)
|
||||
save_session(data)
|
||||
|
||||
def load_external_playlists() -> list:
|
||||
return load_session().get("external_playlists", [])
|
||||
|
||||
def upsert_external_playlist(info: dict):
|
||||
data = load_session()
|
||||
ext = [p for p in data.get("external_playlists", []) if p.get("id") != info["id"]]
|
||||
ext.append(info)
|
||||
data["external_playlists"] = ext
|
||||
save_session(data)
|
||||
|
||||
def remove_external_playlist(pid: str):
|
||||
data = load_session()
|
||||
data["external_playlists"] = [p for p in data.get("external_playlists", []) if p.get("id") != pid]
|
||||
save_session(data)
|
||||
|
||||
# ── Not-found parser ──────────────────────────────────────────────────────────
|
||||
def parse_not_found() -> dict:
|
||||
"""Returns {playlist_name: [(artist, title), ...]}"""
|
||||
result = {}
|
||||
if not NOT_FOUND_FILE.exists():
|
||||
return result
|
||||
current = None
|
||||
for raw in NOT_FOUND_FILE.read_text(encoding="utf-8").splitlines():
|
||||
line = raw.strip()
|
||||
if not line or line.startswith("=="):
|
||||
continue
|
||||
if line.startswith("Playlist:"):
|
||||
current = line[len("Playlist:"):].strip()
|
||||
result.setdefault(current, [])
|
||||
elif current is not None and ": " in line:
|
||||
_, rest = line.split(": ", 1)
|
||||
artist, title = (rest.split(" - ", 1) if " - " in rest else ("", rest))
|
||||
result[current].append((artist.strip(), title.strip()))
|
||||
return result
|
||||
|
||||
# ── Page config ───────────────────────────────────────────────────────────────
|
||||
st.set_page_config(page_title="Spotify → Tidal Sync", layout="wide")
|
||||
st.title("🎵 Spotify → Tidal Sync")
|
||||
|
||||
# ── Auth ──────────────────────────────────────────────────────────────────────
|
||||
@st.cache_resource
|
||||
def get_sessions():
|
||||
try:
|
||||
with open("config.yml", "r") as f:
|
||||
config = yaml.safe_load(f)
|
||||
except Exception as e:
|
||||
st.error(f"Failed to load config.yml: {e}"); st.stop()
|
||||
try:
|
||||
sp = _auth.open_spotify_session(config["spotify"])
|
||||
td = _auth.open_tidal_session()
|
||||
if not td.check_login():
|
||||
st.error("Could not connect to Tidal."); st.stop()
|
||||
return config, sp, td
|
||||
except Exception as e:
|
||||
st.error(f"Authentication failed: {e}"); st.stop()
|
||||
|
||||
config, spotify_session, tidal_session = get_sessions()
|
||||
|
||||
# ── Data fetchers ─────────────────────────────────────────────────────────────
|
||||
@st.cache_data
|
||||
def fetch_my_playlists():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
result = loop.run_until_complete(_sync.get_playlists_from_spotify(spotify_session, config))
|
||||
loop.close()
|
||||
return result
|
||||
|
||||
@st.cache_data(show_spinner=False)
|
||||
def fetch_playlist_tracks(pid: str, name: str) -> tuple:
|
||||
"""Returns (tracks: list, error: str|None). Uses direct API — no sys.exit retry wrapper."""
|
||||
try:
|
||||
all_tracks = []
|
||||
result = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=0)
|
||||
if result is None:
|
||||
return [], "Empty response from Spotify."
|
||||
def _extract(r):
|
||||
return [
|
||||
item["track"] for item in (r.get("items") or [])
|
||||
if item.get("track") and item["track"].get("id")
|
||||
]
|
||||
all_tracks.extend(_extract(result))
|
||||
total = result.get("total", 0)
|
||||
if result.get("next") and total > 50:
|
||||
for offset in range(50, total, 50):
|
||||
page = spotify_session.playlist_tracks(playlist_id=pid, limit=50, offset=offset)
|
||||
if page:
|
||||
all_tracks.extend(_extract(page))
|
||||
return all_tracks, None
|
||||
except spotipy.SpotifyException as e:
|
||||
if e.http_status == 403:
|
||||
return [], "🔒 Access denied (403) — this playlist may be private or collaborative."
|
||||
return [], f"Spotify error {e.http_status}: {e.msg}"
|
||||
except Exception as e:
|
||||
return [], f"Error loading tracks: {e}"
|
||||
|
||||
@st.cache_data(show_spinner=False)
|
||||
def fetch_single_playlist(pid: str) -> dict | None:
|
||||
try:
|
||||
return spotify_session.playlist(pid, fields="id,name,description,tracks.total,owner")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] fetch_single_playlist: {e}")
|
||||
return None
|
||||
|
||||
# ── Sync helper ───────────────────────────────────────────────────────────────
|
||||
def run_sync_playlists(playlists: list):
|
||||
async def _do():
|
||||
tidal_map = {p.name: p for p in await get_all_playlists(tidal_session.user)}
|
||||
total = len(playlists)
|
||||
bar = st.progress(0, text=f"Starting sync of {total} playlist(s)…")
|
||||
skipped = []
|
||||
for i, sp in enumerate(playlists):
|
||||
bar.progress(i / total, text=f"[{i+1}/{total}] {sp['name']}")
|
||||
try:
|
||||
await _sync.sync_playlist(spotify_session, tidal_session, sp, tidal_map.get(sp["name"]), config)
|
||||
except RuntimeError as e:
|
||||
skipped.append((sp["name"], str(e)))
|
||||
print(f"[SKIP] {sp['name']}: {e}")
|
||||
bar.progress(1.0, text="✅ Done!")
|
||||
return skipped
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
skipped = loop.run_until_complete(_do())
|
||||
loop.close()
|
||||
return skipped or []
|
||||
|
||||
# ── Load state ────────────────────────────────────────────────────────────────
|
||||
with st.spinner("Fetching your Spotify playlists…"):
|
||||
spotify_playlists = fetch_my_playlists()
|
||||
|
||||
if not spotify_playlists:
|
||||
st.warning("No Spotify playlists found!"); st.stop()
|
||||
|
||||
watched_ids = load_watched()
|
||||
not_found = parse_not_found()
|
||||
total_nf = sum(len(v) for v in not_found.values())
|
||||
|
||||
# ── Tabs ──────────────────────────────────────────────────────────────────────
|
||||
tab_my, tab_ext, tab_nf = st.tabs([
|
||||
f"🎵 My Playlists ({len(spotify_playlists)})",
|
||||
"🔗 External Playlists",
|
||||
f"❌ Not Found ({total_nf} tracks)",
|
||||
])
|
||||
|
||||
# ════════════════════════════════════════════════════════
|
||||
# TAB 1 – MY PLAYLISTS (expandable rows)
|
||||
# ════════════════════════════════════════════════════════
|
||||
with tab_my:
|
||||
# Collect playlists selected for sync via session_state checkboxes
|
||||
sync_btn_col, wtch_btn_col, _ = st.columns([2, 2, 4])
|
||||
|
||||
# We need to render the list first so state keys exist, then read them
|
||||
# ── Column headers ────────────────────────────────
|
||||
hdr = st.columns([6, 1, 1, 1])
|
||||
hdr[0].markdown("**Playlist**")
|
||||
hdr[1].markdown("**Tracks**")
|
||||
hdr[2].markdown("**Watch**")
|
||||
hdr[3].markdown("**Sync**")
|
||||
st.divider()
|
||||
|
||||
current_watched = load_watched() # fresh from disk
|
||||
|
||||
# Sort: watched first, then rest
|
||||
sorted_playlists = (
|
||||
[p for p in spotify_playlists if p["id"] in current_watched] +
|
||||
[p for p in spotify_playlists if p["id"] not in current_watched]
|
||||
)
|
||||
showed_watched_header = False
|
||||
showed_rest_header = False
|
||||
|
||||
for p in sorted_playlists:
|
||||
pid = p["id"]
|
||||
name = p["name"]
|
||||
nf_list = not_found.get(name, [])
|
||||
is_watched = pid in current_watched
|
||||
tracks_key = f"tracks_{pid}"
|
||||
|
||||
# Section headers
|
||||
if is_watched and not showed_watched_header:
|
||||
st.markdown("### 👁 Watched Playlists")
|
||||
showed_watched_header = True
|
||||
elif not is_watched and not showed_rest_header:
|
||||
if showed_watched_header:
|
||||
st.divider()
|
||||
st.markdown("### 🎵 All Playlists")
|
||||
showed_rest_header = True
|
||||
|
||||
# Use cached tracks from session_state if already loaded
|
||||
cached_tracks: list | None = st.session_state.get(tracks_key)
|
||||
track_count = len(cached_tracks) if cached_tracks is not None else None
|
||||
|
||||
# Build expander label
|
||||
count_str = str(track_count) if track_count is not None else "…"
|
||||
prefix = "★ " if is_watched else ""
|
||||
warn = " ⚠️" if nf_list else ""
|
||||
label = f"{prefix}{name} — {count_str} tracks{warn}"
|
||||
|
||||
with st.expander(label, expanded=False):
|
||||
left, right = st.columns([3, 1])
|
||||
|
||||
with right:
|
||||
# Watch toggle
|
||||
new_watch = st.checkbox("👁 Watch", value=is_watched, key=f"watch_{pid}")
|
||||
if new_watch != is_watched:
|
||||
if new_watch:
|
||||
current_watched.add(pid)
|
||||
else:
|
||||
current_watched.discard(pid)
|
||||
save_watched(current_watched)
|
||||
st.rerun()
|
||||
|
||||
# Sync checkbox (persists in session_state automatically)
|
||||
st.checkbox("▶ Add to sync", key=f"sync_{pid}")
|
||||
|
||||
with left:
|
||||
if cached_tracks is None:
|
||||
if st.button("🔍 Load tracks", key=f"load_{pid}"):
|
||||
with st.spinner(f"Loading '{name}'…"):
|
||||
tracks, err = fetch_playlist_tracks(pid, name)
|
||||
st.session_state[tracks_key] = tracks
|
||||
if err:
|
||||
st.session_state[f"tracks_err_{pid}"] = err
|
||||
st.rerun()
|
||||
else:
|
||||
err = st.session_state.get(f"tracks_err_{pid}")
|
||||
if err:
|
||||
st.warning(err)
|
||||
elif cached_tracks:
|
||||
nf_titles = {title for _, title in nf_list}
|
||||
rows = [
|
||||
{
|
||||
"": "❌" if t["name"] in nf_titles else "✅",
|
||||
"Title": t["name"],
|
||||
"Artist": ", ".join(a["name"] for a in t.get("artists", [])),
|
||||
}
|
||||
for t in cached_tracks
|
||||
]
|
||||
st.dataframe(
|
||||
pd.DataFrame(rows),
|
||||
hide_index=True,
|
||||
use_container_width=True,
|
||||
height=min(40 + len(rows) * 35, 300),
|
||||
column_config={
|
||||
"": st.column_config.TextColumn("", width="small"),
|
||||
"Title": st.column_config.TextColumn("Title"),
|
||||
"Artist": st.column_config.TextColumn("Artist"),
|
||||
},
|
||||
)
|
||||
if nf_list:
|
||||
st.caption(f"⚠️ {len(nf_list)} track(s) not found on Tidal")
|
||||
else:
|
||||
st.info("No tracks found for this playlist.")
|
||||
|
||||
st.divider()
|
||||
|
||||
# ── Gather selected playlists & action buttons ────
|
||||
sel_pls = [p for p in spotify_playlists if st.session_state.get(f"sync_{p['id']}", False)]
|
||||
wtch_pls = [p for p in spotify_playlists if p["id"] in load_watched()]
|
||||
|
||||
with sync_btn_col:
|
||||
if st.button(f"▶ Sync Selected ({len(sel_pls)})", disabled=not sel_pls, type="primary", width="stretch"):
|
||||
skipped = run_sync_playlists(sel_pls)
|
||||
for name, err in skipped:
|
||||
st.warning(f"⚠️ Skipped **{name}**: {err}")
|
||||
synced = len(sel_pls) - len(skipped)
|
||||
if synced:
|
||||
st.success(f"✅ {synced} playlist(s) synced!")
|
||||
st.balloons()
|
||||
|
||||
with wtch_btn_col:
|
||||
if st.button(f"👁 Sync Watched ({len(wtch_pls)})", disabled=not wtch_pls, width="stretch"):
|
||||
skipped = run_sync_playlists(wtch_pls)
|
||||
for name, err in skipped:
|
||||
st.warning(f"⚠️ Skipped **{name}**: {err}")
|
||||
synced = len(wtch_pls) - len(skipped)
|
||||
if synced:
|
||||
st.success(f"✅ {synced} watched playlist(s) synced!")
|
||||
st.balloons()
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════
|
||||
# TAB 2 – EXTERNAL PLAYLISTS
|
||||
# ════════════════════════════════════════════════════════
|
||||
with tab_ext:
|
||||
st.subheader("Add a friend's playlist")
|
||||
url_input = st.text_input(
|
||||
"Spotify URL or ID",
|
||||
placeholder="https://open.spotify.com/playlist/… or 3ktReNcDeGj94W59UXHU9R",
|
||||
label_visibility="collapsed",
|
||||
)
|
||||
if st.button("➕ Add Playlist", type="primary"):
|
||||
pid = url_input.strip()
|
||||
if "spotify.com/playlist/" in pid:
|
||||
pid = pid.split("spotify.com/playlist/")[-1].split("?")[0].strip()
|
||||
if pid:
|
||||
with st.spinner("Looking up playlist…"):
|
||||
info = fetch_single_playlist(pid)
|
||||
if info:
|
||||
tf = info.get("tracks") or {}
|
||||
upsert_external_playlist({
|
||||
"id": info["id"],
|
||||
"name": info["name"],
|
||||
"description": info.get("description", ""),
|
||||
"total_tracks": tf.get("total", 0) if isinstance(tf, dict) else 0,
|
||||
"owner": (info.get("owner") or {}).get("display_name", ""),
|
||||
})
|
||||
st.success(f"✅ Added **{info['name']}**")
|
||||
st.rerun()
|
||||
else:
|
||||
st.error("Playlist not found. Is it public? Check the URL/ID.")
|
||||
else:
|
||||
st.warning("Please enter a URL or ID.")
|
||||
|
||||
external = load_external_playlists()
|
||||
if external:
|
||||
st.subheader(f"Saved ({len(external)})")
|
||||
ext_rows = [{"▶ Sync": False, "Playlist": p["name"], "Owner": p.get("owner",""), "Tracks": p.get("total_tracks",0), "ID": p["id"]} for p in external]
|
||||
edit_ext = st.data_editor(
|
||||
pd.DataFrame(ext_rows),
|
||||
column_config={
|
||||
"▶ Sync": st.column_config.CheckboxColumn("▶ Sync", width="small"),
|
||||
"Playlist": st.column_config.TextColumn("Playlist", disabled=True),
|
||||
"Owner": st.column_config.TextColumn("Owner", disabled=True, width="medium"),
|
||||
"Tracks": st.column_config.NumberColumn("Tracks", disabled=True, width="small"),
|
||||
"ID": st.column_config.TextColumn("Spotify ID", disabled=True, width="medium"),
|
||||
},
|
||||
hide_index=True, width="stretch", key="ext_editor",
|
||||
)
|
||||
ext_sel = [p for p in external if p["id"] in set(edit_ext[edit_ext["▶ Sync"]]["ID"].tolist())]
|
||||
if st.button(f"▶ Sync Selected ({len(ext_sel)})", disabled=not ext_sel, type="primary", width="stretch", key="btn_sync_ext"):
|
||||
with st.spinner("Fetching playlist details…"):
|
||||
full_pls = [spotify_session.playlist(p["id"]) for p in ext_sel]
|
||||
skipped = run_sync_playlists(full_pls)
|
||||
for name, err in skipped:
|
||||
st.warning(f"⚠️ Skipped **{name}**: {err}")
|
||||
synced = len(full_pls) - len(skipped)
|
||||
if synced:
|
||||
st.success(f"✅ {synced} external playlist(s) synced!")
|
||||
|
||||
st.divider()
|
||||
rm = st.selectbox("Remove", ["— select —"] + [p["name"] for p in external], key="rm_ext")
|
||||
if rm != "— select —" and st.button("🗑 Remove", type="secondary"):
|
||||
rid = next((p["id"] for p in external if p["name"] == rm), None)
|
||||
if rid:
|
||||
remove_external_playlist(rid)
|
||||
fetch_single_playlist.clear()
|
||||
st.rerun()
|
||||
else:
|
||||
st.info("No external playlists added yet.")
|
||||
|
||||
# ════════════════════════════════════════════════════════
|
||||
# TAB 3 – NOT FOUND
|
||||
# ════════════════════════════════════════════════════════
|
||||
with tab_nf:
|
||||
if not not_found:
|
||||
st.success("✅ No unmatched tracks yet — run a sync first.")
|
||||
else:
|
||||
st.info(f"**{total_nf}** track(s) across **{len(not_found)}** playlist(s) couldn't be matched on Tidal.")
|
||||
flt = st.selectbox("Filter by playlist", ["All playlists"] + sorted(not_found.keys()), key="nf_filter")
|
||||
nf_rows = [
|
||||
{"Playlist": pl, "Artist": a, "Title": t}
|
||||
for pl, items in sorted(not_found.items())
|
||||
if flt in ("All playlists", pl)
|
||||
for a, t in items
|
||||
]
|
||||
if nf_rows:
|
||||
st.dataframe(pd.DataFrame(nf_rows), hide_index=True, use_container_width=True,
|
||||
height=min(80 + len(nf_rows) * 35, 500))
|
||||
else:
|
||||
st.info("No entries for this filter.")
|
||||
|
||||
# ── Liked Songs (opt-in) ──────────────────────────────────────────────────────
|
||||
with st.expander("⭐ Sync Liked Songs (optional)", expanded=False):
|
||||
st.write("Sync your Spotify 'Liked Songs' to your Tidal favorites.")
|
||||
if st.button("Sync Liked Songs Now", type="secondary"):
|
||||
with st.spinner("Syncing liked songs…"):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(_sync.sync_favorites(spotify_session, tidal_session, config))
|
||||
loop.close()
|
||||
st.success("✅ Liked songs synced!")
|
||||
113
interactive_sync.py
Normal file
113
interactive_sync.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import yaml
|
||||
import sys
|
||||
import asyncio
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from spotify_to_tidal import auth as _auth
|
||||
from spotify_to_tidal import sync as _sync
|
||||
from spotify_to_tidal.tidalapi_patch import get_all_playlists
|
||||
|
||||
def print_menu(playlists: List[Dict[str, Any]]):
|
||||
print("\n" + "="*50)
|
||||
print(f"🎵 Spotify Playlists Found ({len(playlists)} total) 🎵")
|
||||
print("="*50)
|
||||
for i, p in enumerate(playlists):
|
||||
track_count = p.get('tracks', {}).get('total', 'Unknown')
|
||||
print(f"[{i+1:2d}] {p['name']:<35} ({track_count:>3} tracks) | ID: {p['id']}")
|
||||
print("="*50)
|
||||
print("Enter the numbers of the playlists you want to sync, separated by commas (e.g., 1, 4, 12).")
|
||||
print("Enter 'ALL' to sync all playlists.")
|
||||
print("Enter 'Q' to quit.")
|
||||
|
||||
async def main_async():
|
||||
# 1. Load config
|
||||
try:
|
||||
with open('config.yml', 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
except Exception as e:
|
||||
sys.exit(f"Failed to load config.yml: {e}")
|
||||
|
||||
# 2. Authenticate
|
||||
print("Authenticating with Spotify...")
|
||||
spotify_session = _auth.open_spotify_session(config['spotify'])
|
||||
|
||||
print("Authenticating with Tidal...")
|
||||
tidal_session = _auth.open_tidal_session()
|
||||
if not tidal_session.check_login():
|
||||
sys.exit("Could not connect to Tidal")
|
||||
|
||||
# 3. Fetch all Spotify user playlists
|
||||
print("\nFetching Spotify playlists...")
|
||||
spotify_playlists = await _sync.get_playlists_from_spotify(spotify_session, config)
|
||||
|
||||
if not spotify_playlists:
|
||||
print("No Spotify playlists found!")
|
||||
return
|
||||
|
||||
# 4. Interactive menu loop
|
||||
while True:
|
||||
print_menu(spotify_playlists)
|
||||
choice = input("\nYour choice: ").strip().lower()
|
||||
|
||||
if choice == 'q':
|
||||
print("Exiting...")
|
||||
return
|
||||
|
||||
selected_playlists = []
|
||||
if choice == 'all':
|
||||
selected_playlists = spotify_playlists
|
||||
print(f"Selected ALL {len(selected_playlists)} playlists for synchronization.")
|
||||
break
|
||||
|
||||
else:
|
||||
try:
|
||||
# Parse comma-separated indices
|
||||
indices = [int(x.strip()) - 1 for x in choice.split(',')]
|
||||
for idx in indices:
|
||||
if 0 <= idx < len(spotify_playlists):
|
||||
selected_playlists.append(spotify_playlists[idx])
|
||||
else:
|
||||
print(f"Warning: Number {idx + 1} is out of range, ignoring.")
|
||||
|
||||
if not selected_playlists:
|
||||
print("No valid playlists selected. Please try again.")
|
||||
continue
|
||||
|
||||
print(f"Selected {len(selected_playlists)} playlists for synchronization.")
|
||||
for p in selected_playlists:
|
||||
print(f" - {p['name']}")
|
||||
|
||||
confirm = input("Proceed? (Y/n): ").strip().lower()
|
||||
if confirm in ('', 'y', 'yes'):
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numbers separated by commas (e.g. 1, 2, 3) or 'ALL'.")
|
||||
|
||||
# 5. Execute the sync for the selected playlists
|
||||
if not selected_playlists:
|
||||
return
|
||||
|
||||
print("\n=== Starting Synchronization ===")
|
||||
tidal_playlists = {p.name: p for p in await get_all_playlists(tidal_session.user)}
|
||||
|
||||
mappings_to_sync = []
|
||||
for sp_playlist in selected_playlists:
|
||||
mappings_to_sync.append(_sync.pick_tidal_playlist_for_spotify_playlist(sp_playlist, tidal_playlists))
|
||||
|
||||
for index, (sp_playlist, td_playlist) in enumerate(mappings_to_sync):
|
||||
print(f"\n[{index+1}/{len(mappings_to_sync)}] Syncing playlist: {sp_playlist['name']}")
|
||||
await _sync.sync_playlist(spotify_session, tidal_session, sp_playlist, td_playlist, config)
|
||||
|
||||
print("\n=== Synchronization Complete ===")
|
||||
|
||||
# Optionally sync favorites if enabled in config
|
||||
if config.get('sync_favorites_default', True):
|
||||
fav_confirm = input("\nWould you like to sync your 'Liked Songs' as well? (Y/n): ").strip().lower()
|
||||
if fav_confirm in ('', 'y', 'yes'):
|
||||
await _sync.sync_favorites(spotify_session, tidal_session, config)
|
||||
|
||||
def main():
|
||||
asyncio.run(main_async())
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -4,12 +4,12 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "spotify_to_tidal"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
dependencies = [
|
||||
"spotipy~=2.24",
|
||||
"tidalapi==0.8",
|
||||
"tidalapi~=0.8.10",
|
||||
"pyyaml~=6.0",
|
||||
"tqdm~=4.64",
|
||||
"sqlalchemy~=2.0",
|
||||
|
||||
28
session.json
Normal file
28
session.json
Normal file
@@ -0,0 +1,28 @@
|
||||
{
|
||||
"watched": [
|
||||
"2sLEYLo3a6dDHbl4ypUkcl",
|
||||
"3Sjb6ZlsPxIpzOMZVoq1pW",
|
||||
"3eNcChQ2KssR8QjuV7KUEc",
|
||||
"44Shnwiw8xk3UmKf7Unp6o",
|
||||
"4ISU1ED9Vx8SH8o3JQVttw",
|
||||
"4iVWLq7tdRfLzTbKsYhM56",
|
||||
"5d49VR3snvfwyAJxJgB20s",
|
||||
"7KG4kWpAQMhJxudcO42YkG"
|
||||
],
|
||||
"external_playlists": [
|
||||
{
|
||||
"id": "3ysFr4SgWB6IfzKgTij9Kj",
|
||||
"name": "Down Down Down",
|
||||
"description": "",
|
||||
"total_tracks": 0,
|
||||
"owner": "Lino Toran Jenner"
|
||||
},
|
||||
{
|
||||
"id": "4YqSwVf1etm2ILxOwifQyv",
|
||||
"name": "SunSet [Suppe]",
|
||||
"description": "",
|
||||
"total_tracks": 0,
|
||||
"owner": "Lino Toran Jenner"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -11,7 +11,7 @@ __all__ = [
|
||||
'open_tidal_session'
|
||||
]
|
||||
|
||||
SPOTIFY_SCOPES = 'playlist-read-private, user-library-read'
|
||||
SPOTIFY_SCOPES = 'playlist-read-private playlist-read-collaborative user-library-read'
|
||||
|
||||
def open_spotify_session(config) -> spotipy.Spotify:
|
||||
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
|
||||
|
||||
@@ -139,6 +139,10 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
|
||||
try:
|
||||
return await function(*args, **kwargs)
|
||||
except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e:
|
||||
# 403 Forbidden: retrying will never help — fail fast to avoid hanging
|
||||
if isinstance(e, spotipy.exceptions.SpotifyException) and e.http_status == 403:
|
||||
raise RuntimeError(f"Access denied (403) for playlist — it may be private or collaborative: {e}") from e
|
||||
|
||||
if remaining:
|
||||
print(f"{str(e)} occurred, retrying {remaining} times")
|
||||
else:
|
||||
@@ -161,7 +165,7 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs):
|
||||
async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]:
|
||||
output = []
|
||||
results = fetch_function(0)
|
||||
output.extend([item['track'] for item in results['items'] if item['track'] is not None])
|
||||
output.extend([item.get('track') or item.get('item') for item in results['items'] if item.get('track') or item.get('item')])
|
||||
|
||||
# Get all the remaining tracks in parallel
|
||||
if results['next']:
|
||||
@@ -171,15 +175,14 @@ async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[di
|
||||
desc="Fetching additional data chunks"
|
||||
)
|
||||
for extra_result in extra_results:
|
||||
output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None])
|
||||
output.extend([item.get('track') or item.get('item') for item in extra_result['items'] if item.get('track') or item.get('item')])
|
||||
|
||||
return output
|
||||
|
||||
|
||||
async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist):
|
||||
def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str):
|
||||
fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type"
|
||||
return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset)
|
||||
return spotify_session.playlist_tracks(playlist_id=playlist_id, offset=offset)
|
||||
|
||||
print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'")
|
||||
items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"]))
|
||||
|
||||
Reference in New Issue
Block a user