from __future__ import annotations import logging import time from typing import Any, Dict, List, Optional, TYPE_CHECKING from googleapiclient.discovery import build from googleapiclient.errors import HttpError if TYPE_CHECKING: from bot import Bot from yt_dlp import YoutubeDL from yt_dlp.downloader import get_suitable_downloader from bot.config.models import YtModel from bot.player.enums import TrackType from bot.player.track import Track from bot.services import Service as _Service from bot import errors class YtService(_Service): def __init__(self, bot: Bot, config: YtModel): self.bot = bot self.config = config self.name = "yt" self.hostnames = [] self.is_enabled = self.config.enabled self.error_message = "" self.warning_message = "" self.help = "" self.hidden = False self._last_search = time.time() self._search_interval = 1.0 self._search_results_cache = {} self._track_cache = {} self._cache_timeout = 300 API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw' try: self.youtube_api = build("youtube", "v3", developerKey=API_KEY) except Exception as e: logging.error(f"Failed to initialize YouTube API: {str(e)}") self.is_enabled = False self.error_message = "YouTube API initialization failed" def initialize(self): self._ydl_config = { "format": "bestaudio/best", "outtmpl": "%(title)s.%(ext)s", "socket_timeout": 5, "logger": logging.getLogger("root"), "cookiefile": "/home/ttbot/data/cookies.txt", "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ], "postprocessor_args": [ "-ar", "44100" ], "prefer_ffmpeg": True, "keepvideo": False, } def download(self, track: Track, file_path: str) -> None: try: info = track.extra_info if not info: super().download(track, file_path) return ydl_opts = self._ydl_config.copy() ydl_opts["outtmpl"] = file_path.rsplit(".", 1)[0] + ".mp3" with YoutubeDL(ydl_opts) as ydl: info = ydl.process_ie_result(info, download=True) except Exception as e: logging.error(f"Download error: {str(e)}", exc_info=True) raise errors.ServiceError("Download failed") def get( self, url: str, extra_info: Optional[Dict[str, Any]] = None, process: bool = False, ) -> List[Track]: if not (url or extra_info): raise errors.InvalidArgumentError() cache_key = url or extra_info.get('webpage_url', '') if extra_info else '' if cache_key in self._track_cache and not process: cache_time, track = self._track_cache[cache_key] if time.time() - cache_time < self._cache_timeout: return [track] try: with YoutubeDL(self._ydl_config) as ydl: if process and extra_info: info = ydl.extract_info(extra_info.get('webpage_url', url), process=False) elif not extra_info: info = ydl.extract_info(url, process=False) else: info = extra_info info_type = info.get("_type") if info_type == "url" and not info.get("ie_key"): return self.get(info["url"], process=False) elif info_type == "playlist": tracks: List[Track] = [] for entry in info["entries"]: data = self.get("", extra_info=entry, process=False) tracks.extend(data) return tracks if not process: track = Track( service=self.name, url=info.get('webpage_url', url) or url, name=info.get('title', ''), extra_info=info, format=info.get('ext', ''), type=TrackType.Dynamic ) self._track_cache[cache_key] = (time.time(), track) return [track] stream = ydl.process_ie_result(info, download=False) if "url" not in stream: raise errors.ServiceError("No URL found in stream info") title = stream.get("title", "") if stream.get("uploader"): title += f" - {stream['uploader']}" track_type = TrackType.Live if stream.get("is_live") else TrackType.Default track = Track( service=self.name, url=stream["url"], name=title, format=stream.get("ext", ""), type=track_type, extra_info=stream ) self._track_cache[cache_key] = (time.time(), track) return [track] except Exception as e: logging.error(f"Error getting track info: {str(e)}", exc_info=True) raise errors.ServiceError(f"Failed to get track info: {str(e)}") def search(self, query: str) -> List[Track]: current_time = time.time() if query in self._search_results_cache: cache_time, results = self._search_results_cache[query] if current_time - cache_time < self._cache_timeout: return results.copy() time_since_last = current_time - self._last_search if time_since_last < self._search_interval: time.sleep(self._search_interval - time_since_last) try: search_response = self.youtube_api.search().list( q=query, part="snippet", maxResults=25, type="video" ).execute() tracks = [] for search_result in search_response.get("items", []): if 'videoId' in search_result.get('id', {}): video_url = f"https://www.youtube.com/watch?v={search_result['id']['videoId']}" track = Track( service=self.name, url=video_url, name=search_result["snippet"]["title"], type=TrackType.Dynamic, extra_info={"webpage_url": video_url, "title": search_result["snippet"]["title"]} ) self._track_cache[video_url] = (current_time, track.get_raw()) tracks.append(track) self._search_results_cache[query] = (current_time, tracks.copy()) self._last_search = current_time if tracks: return tracks raise errors.NothingFoundError("No videos found") except HttpError as e: logging.error(f"YouTube API error: {str(e)}", exc_info=True) raise errors.NothingFoundError(f"YouTube search failed: {str(e)}") except Exception as e: logging.error(f"Unexpected error in search: {str(e)}", exc_info=True) raise errors.ServiceError(f"Search failed: {str(e)}")