64 lines
1.8 KiB
Python
64 lines
1.8 KiB
Python
import subprocess
|
|
import os
|
|
from config import settings
|
|
import requests
|
|
|
|
def detect_query_type(query: str):
|
|
if "youtube.com/playlist" in query or "list=" in query:
|
|
return "playlist"
|
|
elif "youtube.com/watch" in query or "youtu.be/" in query:
|
|
return "video"
|
|
else:
|
|
return "search"
|
|
|
|
def download_song(query: str):
|
|
output_template = os.path.join(settings.MUSIC_DIR, "%(uploader)s/%(title)s.%(ext)s")
|
|
|
|
query_type = detect_query_type(query)
|
|
|
|
if query_type == "video":
|
|
yt_query = query
|
|
playlist_flag = "--no-playlist"
|
|
extra_filters = []
|
|
elif query_type == "playlist":
|
|
yt_query = query
|
|
playlist_flag = None
|
|
extra_filters = []
|
|
else:
|
|
yt_query = f"ytsearch20:{query}"
|
|
playlist_flag = "--no-playlist"
|
|
filter_expr = f"'{query.lower()}' in uploader.lower()" # ✅ valid expression
|
|
extra_filters = ["--match-filter", filter_expr]
|
|
|
|
command = [
|
|
"yt-dlp",
|
|
yt_query,
|
|
"--extract-audio",
|
|
"--audio-format", "mp3",
|
|
"--output", output_template,
|
|
"--quiet"
|
|
]
|
|
|
|
if playlist_flag:
|
|
command.append(playlist_flag)
|
|
if extra_filters:
|
|
command.extend(extra_filters)
|
|
|
|
result = subprocess.run(command, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"❌ Download failed:\n{result.stderr}")
|
|
|
|
if settings.NAVIDROME_SCAN_URL:
|
|
try:
|
|
res = requests.get(settings.NAVIDROME_SCAN_URL, timeout=5)
|
|
res.raise_for_status()
|
|
except Exception as e:
|
|
print(f"⚠️ Failed to trigger Navidrome rescan: {e}")
|
|
|
|
return {
|
|
"status": "success",
|
|
"query": query,
|
|
"log": result.stdout + "\n" + result.stderr
|
|
}
|