from __future__ import annotations import logging import time from typing import Any, Dict, List, Optional, TYPE_CHECKING from googleapiclient.discovery import build from googleapiclient.errors import HttpError from yt_dlp import YoutubeDL from bot.config.models import YtModel from bot.player.enums import TrackType from bot.player.track import Track from bot.services import Service as _Service from bot import errors import ffmpeg # ffmpeg-python for conversion if TYPE_CHECKING: from bot import Bot class YtService(_Service): def __init__(self, bot: Bot, config: YtModel): self.bot = bot self.config = config self.name = "yt" self.hostnames = [] self.is_enabled = self.config.enabled self.error_message = "" self.warning_message = "" self.help = "" self.hidden = False self._last_search = time.time() self._search_interval = 1.0 self._search_results_cache: Dict[str, tuple[float, List[Track]]] = {} self._track_cache: Dict[str, tuple[float, Track]] = {} self._cache_timeout = 300 try: API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw' self.youtube_api = build("youtube", "v3", developerKey=API_KEY) except Exception as e: logging.error(f"Failed to initialize YouTube API: {str(e)}") self.is_enabled = False self.error_message = "YouTube API initialization failed" def initialize(self): self._ydl_config = { "skip_download": True, "format": "bestaudio/best[protocol!=m3u8_native]/best", "socket_timeout": 5, "logger": logging.getLogger("root"), "cookiefile": "/home/ttbot/data/cookies.txt" } def convert_to_mp3(self, input_path: str, output_path: str): try: stream = ffmpeg.input(input_path) stream = ffmpeg.output(stream, output_path, **{'b:a': '192k', 'format': 'mp3'}) ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True) except Exception as e: logging.error(f"Error converting to MP3: {str(e)}", exc_info=True) raise errors.ServiceError("Failed to convert to MP3") def download(self, track: Track, file_path: str) -> None: info = track.extra_info if not info: raise errors.ServiceError("Missing track extra_info for download") temp_path = file_path.replace('.mp3', '_temp.m4a') try: ydl_opts = self._ydl_config.copy() ydl_opts.update({ "outtmpl": temp_path, "format": "bestaudio/best", "quiet": True, "no_warnings": True, "postprocessors": [], }) with YoutubeDL(ydl_opts) as ydl: ydl.download([info.get('webpage_url', track.url)]) self.convert_to_mp3(temp_path, file_path) logging.info(f"Downloaded and converted file: {file_path}") except Exception as e: logging.error(f"Download error: {str(e)}", exc_info=True) raise errors.ServiceError("Download failed") def get(self, url: str, extra_info: Optional[Dict[str, Any]] = None, process: bool = False) -> List[Track]: if not (url or extra_info): raise errors.InvalidArgumentError("URL or extra_info must be provided") cache_key = extra_info.get('webpage_url', '') if extra_info else url if cache_key in self._track_cache and not process: cache_time, track = self._track_cache[cache_key] if time.time() - cache_time < self._cache_timeout: return [track] try: with YoutubeDL(self._ydl_config) as ydl: if extra_info and not process: info = extra_info else: info = ydl.extract_info(url, download=False) if info.get('_type') == 'url' and not info.get('ie_key'): return self.get(info['url'], process=False) if info.get('_type') == 'playlist': tracks: List[Track] = [] for entry in info['entries']: tracks.extend(self.get("", extra_info=entry, process=False)) return tracks track = Track( service=self.name, url=info.get('webpage_url', url) or url, name=info.get('title', 'Unknown Title'), extra_info=info, format=info.get('ext', ''), type=TrackType.Dynamic, ) self._track_cache[cache_key] = (time.time(), track) return [track] except Exception as e: logging.error(f"Failed to get track info: {str(e)}", exc_info=True) raise errors.ServiceError(f"Failed to get track info: {str(e)}") def search(self, query: str) -> List[Track]: current_time = time.time() if query in self._search_results_cache: cache_time, results = self._search_results_cache[query] if current_time - cache_time < self._cache_timeout: return results.copy() if (delay := self._search_interval - (current_time - self._last_search)) > 0: time.sleep(delay) try: response = self.youtube_api.search().list( q=query, part="snippet", maxResults=25, type="video" ).execute() tracks = [] for item in response.get('items', []): video_id = item.get('id', {}).get('videoId') if video_id: url = f"https://www.youtube.com/watch?v={video_id}" title = item.get('snippet', {}).get('title', 'Unknown Title') track = Track( service=self.name, url=url, name=title, type=TrackType.Dynamic, extra_info={"webpage_url": url, "title": title}, ) self._track_cache[url] = (current_time, track) tracks.append(track) self._search_results_cache[query] = (current_time, tracks.copy()) self._last_search = current_time if not tracks: raise errors.NothingFoundError("No videos found") return tracks except HttpError as e: logging.error(f"YouTube API error: {str(e)}", exc_info=True) raise errors.NothingFoundError(f"YouTube search failed: {str(e)}") except Exception as e: logging.error(f"Unexpected error during search: {str(e)}", exc_info=True) raise errors.ServiceError(f"Search failed: {str(e)}")