fixing yt.py pt2.
Some checks are pending
Build-nightly / docker (push) Waiting to run

This commit is contained in:
Umiko 2025-05-05 08:17:25 +07:00
parent b0d563edb7
commit adb6e075a4

View File

@ -1,20 +1,20 @@
from __future__ import annotations
import logging
import time
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
if TYPE_CHECKING:
from bot import Bot
from yt_dlp import YoutubeDL
from yt_dlp.downloader import get_suitable_downloader
from bot.config.models import YtModel
from bot.player.enums import TrackType
from bot.player.track import Track
from bot.services import Service as _Service
from bot import errors
import ffmpeg # ffmpeg-python for conversion
if TYPE_CHECKING:
from bot import Bot
class YtService(_Service):
def __init__(self, bot: Bot, config: YtModel):
@ -27,15 +27,15 @@ class YtService(_Service):
self.warning_message = ""
self.help = ""
self.hidden = False
self._last_search = time.time()
self._search_interval = 1.0
self._search_results_cache: Dict[str, tuple[float, List[Track]]] = {}
self._track_cache: Dict[str, tuple[float, Track]] = {}
self._search_results_cache = {}
self._track_cache = {}
self._cache_timeout = 300
API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw'
try:
API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw'
self.youtube_api = build("youtube", "v3", developerKey=API_KEY)
except Exception as e:
logging.error(f"Failed to initialize YouTube API: {str(e)}")
@ -44,104 +44,128 @@ class YtService(_Service):
def initialize(self):
self._ydl_config = {
"skip_download": True,
"format": "bestaudio/best[protocol!=m3u8_native]/best",
"format": "bestaudio/best",
"outtmpl": "%(title)s.%(ext)s",
"postprocessors": [
{ # Tambah postprocessor biar otomatis convert ke mp3
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
"socket_timeout": 5,
"logger": logging.getLogger("root"),
"cookiefile": "/home/ttbot/data/cookies.txt"
}
def convert_to_mp3(self, input_path: str, output_path: str):
try:
stream = ffmpeg.input(input_path)
stream = ffmpeg.output(stream, output_path, **{'b:a': '192k', 'format': 'mp3'})
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)
except Exception as e:
logging.error(f"Error converting to MP3: {str(e)}", exc_info=True)
raise errors.ServiceError("Failed to convert to MP3")
def download(self, track: Track, file_path: str) -> None:
info = track.extra_info
if not info:
raise errors.ServiceError("Missing track extra_info for download")
temp_path = file_path.replace('.mp3', '_temp.m4a')
try:
info = track.extra_info
if not info:
super().download(track, file_path)
return
# Setting manual output template supaya filenya langsung ke file_path yang dikasih
ydl_opts = self._ydl_config.copy()
ydl_opts.update({
"outtmpl": temp_path,
"format": "bestaudio/best",
"quiet": True,
"no_warnings": True,
"postprocessors": [],
})
ydl_opts["outtmpl"] = file_path.rsplit(".", 1)[0] + ".mp3"
with YoutubeDL(ydl_opts) as ydl:
ydl.download([info.get('webpage_url', track.url)])
self.convert_to_mp3(temp_path, file_path)
logging.info(f"Downloaded and converted file: {file_path}")
dl = get_suitable_downloader(info)(ydl, ydl_opts)
dl.download(file_path, info)
except Exception as e:
logging.error(f"Download error: {str(e)}", exc_info=True)
raise errors.ServiceError("Download failed")
def get(self, url: str, extra_info: Optional[Dict[str, Any]] = None, process: bool = False) -> List[Track]:
def get(
self,
url: str,
extra_info: Optional[Dict[str, Any]] = None,
process: bool = False,
) -> List[Track]:
if not (url or extra_info):
raise errors.InvalidArgumentError("URL or extra_info must be provided")
raise errors.InvalidArgumentError()
cache_key = extra_info.get('webpage_url', '') if extra_info else url
cache_key = url or extra_info.get('webpage_url', '') if extra_info else ''
if cache_key in self._track_cache and not process:
cache_time, track = self._track_cache[cache_key]
if time.time() - cache_time < self._cache_timeout:
return [track]
try:
with YoutubeDL(self._ydl_config) as ydl:
if extra_info and not process:
info = extra_info
if process and extra_info:
info = ydl.extract_info(extra_info.get('webpage_url', url), process=False)
elif not extra_info:
info = ydl.extract_info(url, process=False)
else:
info = ydl.extract_info(url, download=False)
info = extra_info
if info.get('_type') == 'url' and not info.get('ie_key'):
return self.get(info['url'], process=False)
if info.get('_type') == 'playlist':
info_type = info.get("_type")
if info_type == "url" and not info.get("ie_key"):
return self.get(info["url"], process=False)
elif info_type == "playlist":
tracks: List[Track] = []
for entry in info['entries']:
tracks.extend(self.get("", extra_info=entry, process=False))
for entry in info["entries"]:
data = self.get("", extra_info=entry, process=False)
tracks.extend(data)
return tracks
if not process:
track = Track(
service=self.name,
url=info.get('webpage_url', url) or url,
name=info.get('title', ''),
extra_info=info,
format=info.get('ext', ''),
type=TrackType.Dynamic
)
self._track_cache[cache_key] = (time.time(), track)
return [track]
stream = ydl.process_ie_result(info, download=False)
if "url" not in stream:
raise errors.ServiceError("No URL found in stream info")
title = stream.get("title", "")
if stream.get("uploader"):
title += f" - {stream['uploader']}"
track_type = TrackType.Live if stream.get("is_live") else TrackType.Default
track = Track(
service=self.name,
url=info.get('webpage_url', url) or url,
name=info.get('title', 'Unknown Title'),
extra_info=info,
format=info.get('ext', ''),
type=TrackType.Dynamic,
url=stream["url"],
name=title,
format=stream.get("ext", ""),
type=track_type,
extra_info=stream
)
self._track_cache[cache_key] = (time.time(), track)
return [track]
except Exception as e:
logging.error(f"Failed to get track info: {str(e)}", exc_info=True)
logging.error(f"Error getting track info: {str(e)}", exc_info=True)
raise errors.ServiceError(f"Failed to get track info: {str(e)}")
def search(self, query: str) -> List[Track]:
current_time = time.time()
if query in self._search_results_cache:
cache_time, results = self._search_results_cache[query]
if current_time - cache_time < self._cache_timeout:
return results.copy()
if (delay := self._search_interval - (current_time - self._last_search)) > 0:
time.sleep(delay)
time_since_last = current_time - self._last_search
if time_since_last < self._search_interval:
time.sleep(self._search_interval - time_since_last)
try:
response = self.youtube_api.search().list(
search_response = self.youtube_api.search().list(
q=query,
part="snippet",
maxResults=25,
@ -149,32 +173,30 @@ class YtService(_Service):
).execute()
tracks = []
for item in response.get('items', []):
video_id = item.get('id', {}).get('videoId')
if video_id:
url = f"https://www.youtube.com/watch?v={video_id}"
title = item.get('snippet', {}).get('title', 'Unknown Title')
for search_result in search_response.get("items", []):
if 'videoId' in search_result.get('id', {}):
video_url = f"https://www.youtube.com/watch?v={search_result['id']['videoId']}"
track = Track(
service=self.name,
url=url,
name=title,
url=video_url,
name=search_result["snippet"]["title"],
type=TrackType.Dynamic,
extra_info={"webpage_url": url, "title": title},
extra_info={"webpage_url": video_url, "title": search_result["snippet"]["title"]}
)
self._track_cache[url] = (current_time, track)
self._track_cache[video_url] = (current_time, track.get_raw())
tracks.append(track)
self._search_results_cache[query] = (current_time, tracks.copy())
self._last_search = current_time
if not tracks:
raise errors.NothingFoundError("No videos found")
return tracks
if tracks:
return tracks
raise errors.NothingFoundError("No videos found")
except HttpError as e:
logging.error(f"YouTube API error: {str(e)}", exc_info=True)
raise errors.NothingFoundError(f"YouTube search failed: {str(e)}")
except Exception as e:
logging.error(f"Unexpected error during search: {str(e)}", exc_info=True)
raise errors.ServiceError(f"Search failed: {str(e)}")
logging.error(f"Unexpected error in search: {str(e)}", exc_info=True)
raise errors.ServiceError(f"Search failed: {str(e)}")