diff --git a/bot/services/yt.py b/bot/services/yt.py index 74d7f92..620033b 100644 --- a/bot/services/yt.py +++ b/bot/services/yt.py @@ -1,20 +1,20 @@ from __future__ import annotations - import logging import time from typing import Any, Dict, List, Optional, TYPE_CHECKING from googleapiclient.discovery import build from googleapiclient.errors import HttpError + +if TYPE_CHECKING: + from bot import Bot + from yt_dlp import YoutubeDL +from yt_dlp.downloader import get_suitable_downloader from bot.config.models import YtModel from bot.player.enums import TrackType from bot.player.track import Track from bot.services import Service as _Service from bot import errors -import ffmpeg # ffmpeg-python for conversion - -if TYPE_CHECKING: - from bot import Bot class YtService(_Service): def __init__(self, bot: Bot, config: YtModel): @@ -27,15 +27,15 @@ class YtService(_Service): self.warning_message = "" self.help = "" self.hidden = False - + self._last_search = time.time() self._search_interval = 1.0 - self._search_results_cache: Dict[str, tuple[float, List[Track]]] = {} - self._track_cache: Dict[str, tuple[float, Track]] = {} + self._search_results_cache = {} + self._track_cache = {} self._cache_timeout = 300 - + + API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw' try: - API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw' self.youtube_api = build("youtube", "v3", developerKey=API_KEY) except Exception as e: logging.error(f"Failed to initialize YouTube API: {str(e)}") @@ -44,104 +44,128 @@ class YtService(_Service): def initialize(self): self._ydl_config = { - "skip_download": True, - "format": "bestaudio/best[protocol!=m3u8_native]/best", + "format": "bestaudio/best", + "outtmpl": "%(title)s.%(ext)s", + "postprocessors": [ + { # Tambah postprocessor biar otomatis convert ke mp3 + "key": "FFmpegExtractAudio", + "preferredcodec": "mp3", + "preferredquality": "192", + } + ], "socket_timeout": 5, "logger": logging.getLogger("root"), "cookiefile": "/home/ttbot/data/cookies.txt" } - def convert_to_mp3(self, input_path: str, output_path: str): - try: - stream = ffmpeg.input(input_path) - stream = ffmpeg.output(stream, output_path, **{'b:a': '192k', 'format': 'mp3'}) - ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True) - except Exception as e: - logging.error(f"Error converting to MP3: {str(e)}", exc_info=True) - raise errors.ServiceError("Failed to convert to MP3") - def download(self, track: Track, file_path: str) -> None: - info = track.extra_info - if not info: - raise errors.ServiceError("Missing track extra_info for download") - - temp_path = file_path.replace('.mp3', '_temp.m4a') - try: + info = track.extra_info + if not info: + super().download(track, file_path) + return + + # Setting manual output template supaya filenya langsung ke file_path yang dikasih ydl_opts = self._ydl_config.copy() - ydl_opts.update({ - "outtmpl": temp_path, - "format": "bestaudio/best", - "quiet": True, - "no_warnings": True, - "postprocessors": [], - }) + ydl_opts["outtmpl"] = file_path.rsplit(".", 1)[0] + ".mp3" with YoutubeDL(ydl_opts) as ydl: - ydl.download([info.get('webpage_url', track.url)]) - - self.convert_to_mp3(temp_path, file_path) - logging.info(f"Downloaded and converted file: {file_path}") - + dl = get_suitable_downloader(info)(ydl, ydl_opts) + dl.download(file_path, info) except Exception as e: logging.error(f"Download error: {str(e)}", exc_info=True) raise errors.ServiceError("Download failed") - def get(self, url: str, extra_info: Optional[Dict[str, Any]] = None, process: bool = False) -> List[Track]: + def get( + self, + url: str, + extra_info: Optional[Dict[str, Any]] = None, + process: bool = False, + ) -> List[Track]: if not (url or extra_info): - raise errors.InvalidArgumentError("URL or extra_info must be provided") + raise errors.InvalidArgumentError() - cache_key = extra_info.get('webpage_url', '') if extra_info else url + cache_key = url or extra_info.get('webpage_url', '') if extra_info else '' if cache_key in self._track_cache and not process: cache_time, track = self._track_cache[cache_key] if time.time() - cache_time < self._cache_timeout: return [track] - + try: with YoutubeDL(self._ydl_config) as ydl: - if extra_info and not process: - info = extra_info + if process and extra_info: + info = ydl.extract_info(extra_info.get('webpage_url', url), process=False) + elif not extra_info: + info = ydl.extract_info(url, process=False) else: - info = ydl.extract_info(url, download=False) + info = extra_info - if info.get('_type') == 'url' and not info.get('ie_key'): - return self.get(info['url'], process=False) - - if info.get('_type') == 'playlist': + info_type = info.get("_type") + + if info_type == "url" and not info.get("ie_key"): + return self.get(info["url"], process=False) + + elif info_type == "playlist": tracks: List[Track] = [] - for entry in info['entries']: - tracks.extend(self.get("", extra_info=entry, process=False)) + for entry in info["entries"]: + data = self.get("", extra_info=entry, process=False) + tracks.extend(data) return tracks - + + if not process: + track = Track( + service=self.name, + url=info.get('webpage_url', url) or url, + name=info.get('title', ''), + extra_info=info, + format=info.get('ext', ''), + type=TrackType.Dynamic + ) + + self._track_cache[cache_key] = (time.time(), track) + return [track] + + stream = ydl.process_ie_result(info, download=False) + + if "url" not in stream: + raise errors.ServiceError("No URL found in stream info") + + title = stream.get("title", "") + if stream.get("uploader"): + title += f" - {stream['uploader']}" + + track_type = TrackType.Live if stream.get("is_live") else TrackType.Default + track = Track( service=self.name, - url=info.get('webpage_url', url) or url, - name=info.get('title', 'Unknown Title'), - extra_info=info, - format=info.get('ext', ''), - type=TrackType.Dynamic, + url=stream["url"], + name=title, + format=stream.get("ext", ""), + type=track_type, + extra_info=stream ) - + self._track_cache[cache_key] = (time.time(), track) return [track] - + except Exception as e: - logging.error(f"Failed to get track info: {str(e)}", exc_info=True) + logging.error(f"Error getting track info: {str(e)}", exc_info=True) raise errors.ServiceError(f"Failed to get track info: {str(e)}") def search(self, query: str) -> List[Track]: current_time = time.time() - + if query in self._search_results_cache: cache_time, results = self._search_results_cache[query] if current_time - cache_time < self._cache_timeout: return results.copy() - - if (delay := self._search_interval - (current_time - self._last_search)) > 0: - time.sleep(delay) - + + time_since_last = current_time - self._last_search + if time_since_last < self._search_interval: + time.sleep(self._search_interval - time_since_last) + try: - response = self.youtube_api.search().list( + search_response = self.youtube_api.search().list( q=query, part="snippet", maxResults=25, @@ -149,32 +173,30 @@ class YtService(_Service): ).execute() tracks = [] - for item in response.get('items', []): - video_id = item.get('id', {}).get('videoId') - if video_id: - url = f"https://www.youtube.com/watch?v={video_id}" - title = item.get('snippet', {}).get('title', 'Unknown Title') + for search_result in search_response.get("items", []): + if 'videoId' in search_result.get('id', {}): + video_url = f"https://www.youtube.com/watch?v={search_result['id']['videoId']}" track = Track( service=self.name, - url=url, - name=title, + url=video_url, + name=search_result["snippet"]["title"], type=TrackType.Dynamic, - extra_info={"webpage_url": url, "title": title}, + extra_info={"webpage_url": video_url, "title": search_result["snippet"]["title"]} ) - self._track_cache[url] = (current_time, track) + + self._track_cache[video_url] = (current_time, track.get_raw()) tracks.append(track) self._search_results_cache[query] = (current_time, tracks.copy()) self._last_search = current_time - if not tracks: - raise errors.NothingFoundError("No videos found") - - return tracks + if tracks: + return tracks + raise errors.NothingFoundError("No videos found") except HttpError as e: logging.error(f"YouTube API error: {str(e)}", exc_info=True) raise errors.NothingFoundError(f"YouTube search failed: {str(e)}") except Exception as e: - logging.error(f"Unexpected error during search: {str(e)}", exc_info=True) - raise errors.ServiceError(f"Search failed: {str(e)}") + logging.error(f"Unexpected error in search: {str(e)}", exc_info=True) + raise errors.ServiceError(f"Search failed: {str(e)}") \ No newline at end of file