From 330e2acf482cbfc4d59eefcb1b382120da9287a3 Mon Sep 17 00:00:00 2001 From: Umiko Date: Mon, 5 May 2025 05:15:38 +0700 Subject: [PATCH] changed the extention file for youtube download. --- bot/services/no_yt.py | 191 ++++++++++++++++++++++++++++++++++++++++++ bot/services/yt.py | 35 +++++--- changelog.txt | 1 + requirements.txt | 1 + 4 files changed, 215 insertions(+), 13 deletions(-) create mode 100644 bot/services/no_yt.py diff --git a/bot/services/no_yt.py b/bot/services/no_yt.py new file mode 100644 index 0000000..e75c0cd --- /dev/null +++ b/bot/services/no_yt.py @@ -0,0 +1,191 @@ +from __future__ import annotations +import logging +import time +from typing import Any, Dict, List, Optional, TYPE_CHECKING +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +if TYPE_CHECKING: + from bot import Bot + +from yt_dlp import YoutubeDL +from yt_dlp.downloader import get_suitable_downloader +from bot.config.models import YtModel +from bot.player.enums import TrackType +from bot.player.track import Track +from bot.services import Service as _Service +from bot import errors + +class YtService(_Service): + def __init__(self, bot: Bot, config: YtModel): + self.bot = bot + self.config = config + self.name = "yt" + self.hostnames = [] + self.is_enabled = self.config.enabled + self.error_message = "" + self.warning_message = "" + self.help = "" + self.hidden = False + + self._last_search = time.time() + self._search_interval = 1.0 + self._search_results_cache = {} + self._track_cache = {} + self._cache_timeout = 300 + + API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw' + try: + self.youtube_api = build("youtube", "v3", developerKey=API_KEY) + except Exception as e: + logging.error(f"Failed to initialize YouTube API: {str(e)}") + self.is_enabled = False + self.error_message = "YouTube API initialization failed" + + def initialize(self): + self._ydl_config = { + "skip_download": True, + "format": "m4a/bestaudio/best[protocol!=m3u8_native]/best", + "socket_timeout": 5, + "logger": logging.getLogger("root"), + "cookiefile": "/home/ttbot/data/cookies.txt" + } + + def download(self, track: Track, file_path: str) -> None: + try: + info = track.extra_info + if not info: + super().download(track, file_path) + return + + with YoutubeDL(self._ydl_config) as ydl: + dl = get_suitable_downloader(info)(ydl, self._ydl_config) + dl.download(file_path, info) + except Exception as e: + logging.error(f"Download error: {str(e)}", exc_info=True) + raise errors.ServiceError("Download failed") + + def get( + self, + url: str, + extra_info: Optional[Dict[str, Any]] = None, + process: bool = False, + ) -> List[Track]: + if not (url or extra_info): + raise errors.InvalidArgumentError() + + cache_key = url or extra_info.get('webpage_url', '') if extra_info else '' + if cache_key in self._track_cache and not process: + cache_time, track = self._track_cache[cache_key] + if time.time() - cache_time < self._cache_timeout: + return [track] + + try: + with YoutubeDL(self._ydl_config) as ydl: + if process and extra_info: + info = ydl.extract_info(extra_info.get('webpage_url', url), process=False) + elif not extra_info: + info = ydl.extract_info(url, process=False) + else: + info = extra_info + + info_type = info.get("_type") + + if info_type == "url" and not info.get("ie_key"): + return self.get(info["url"], process=False) + + elif info_type == "playlist": + tracks: List[Track] = [] + for entry in info["entries"]: + data = self.get("", extra_info=entry, process=False) + tracks.extend(data) + return tracks + + if not process: + track = Track( + service=self.name, + url=info.get('webpage_url', url) or url, + name=info.get('title', ''), + extra_info=info, + format=info.get('ext', ''), + type=TrackType.Dynamic + ) + + self._track_cache[cache_key] = (time.time(), track) + return [track] + + stream = ydl.process_ie_result(info, download=False) + + if "url" not in stream: + raise errors.ServiceError("No URL found in stream info") + + title = stream.get("title", "") + if stream.get("uploader"): + title += f" - {stream['uploader']}" + + track_type = TrackType.Live if stream.get("is_live") else TrackType.Default + + track = Track( + service=self.name, + url=stream["url"], + name=title, + format=stream.get("ext", ""), + type=track_type, + extra_info=stream + ) + + self._track_cache[cache_key] = (time.time(), track) + return [track] + + except Exception as e: + logging.error(f"Error getting track info: {str(e)}", exc_info=True) + raise errors.ServiceError(f"Failed to get track info: {str(e)}") + + def search(self, query: str) -> List[Track]: + current_time = time.time() + + if query in self._search_results_cache: + cache_time, results = self._search_results_cache[query] + if current_time - cache_time < self._cache_timeout: + return results.copy() + + time_since_last = current_time - self._last_search + if time_since_last < self._search_interval: + time.sleep(self._search_interval - time_since_last) + + try: + search_response = self.youtube_api.search().list( + q=query, + part="snippet", + maxResults=25, + type="video" + ).execute() + + tracks = [] + for search_result in search_response.get("items", []): + if 'videoId' in search_result.get('id', {}): + video_url = f"https://www.youtube.com/watch?v={search_result['id']['videoId']}" + track = Track( + service=self.name, + url=video_url, + name=search_result["snippet"]["title"], + type=TrackType.Dynamic, + extra_info={"webpage_url": video_url, "title": search_result["snippet"]["title"]} + ) + + self._track_cache[video_url] = (current_time, track.get_raw()) + tracks.append(track) + + self._search_results_cache[query] = (current_time, tracks.copy()) + self._last_search = current_time + + if tracks: + return tracks + raise errors.NothingFoundError("No videos found") + + except HttpError as e: + logging.error(f"YouTube API error: {str(e)}", exc_info=True) + raise errors.NothingFoundError(f"YouTube search failed: {str(e)}") + except Exception as e: + logging.error(f"Unexpected error in search: {str(e)}", exc_info=True) + raise errors.ServiceError(f"Search failed: {str(e)}") \ No newline at end of file diff --git a/bot/services/yt.py b/bot/services/yt.py index e75c0cd..b33ab91 100644 --- a/bot/services/yt.py +++ b/bot/services/yt.py @@ -4,10 +4,6 @@ import time from typing import Any, Dict, List, Optional, TYPE_CHECKING from googleapiclient.discovery import build from googleapiclient.errors import HttpError - -if TYPE_CHECKING: - from bot import Bot - from yt_dlp import YoutubeDL from yt_dlp.downloader import get_suitable_downloader from bot.config.models import YtModel @@ -15,6 +11,10 @@ from bot.player.enums import TrackType from bot.player.track import Track from bot.services import Service as _Service from bot import errors +import ffmpeg # Import ffmpeg-python untuk konversi MP3 + +if TYPE_CHECKING: + from bot import Bot class YtService(_Service): def __init__(self, bot: Bot, config: YtModel): @@ -45,12 +45,19 @@ class YtService(_Service): def initialize(self): self._ydl_config = { "skip_download": True, - "format": "m4a/bestaudio/best[protocol!=m3u8_native]/best", + "format": "bestaudio/best[protocol!=m3u8_native]/best", # Ganti ke format terbaik audio "socket_timeout": 5, "logger": logging.getLogger("root"), "cookiefile": "/home/ttbot/data/cookies.txt" } + def convert_to_mp3(self, input_path: str, output_path: str): + try: + ffmpeg.input(input_path).output(output_path, audio_bitrate='192k').run() + except Exception as e: + logging.error(f"Error converting to MP3: {str(e)}", exc_info=True) + raise errors.ServiceError("Failed to convert to MP3") + def download(self, track: Track, file_path: str) -> None: try: info = track.extra_info @@ -60,17 +67,19 @@ class YtService(_Service): with YoutubeDL(self._ydl_config) as ydl: dl = get_suitable_downloader(info)(ydl, self._ydl_config) - dl.download(file_path, info) + # Mengunduh audio dalam format terbaik + temp_path = file_path.replace('.mp3', '_temp.m4a') # Unduh sementara dalam format m4a + dl.download(temp_path, info) + + # Setelah mengunduh file m4a, konversi ke MP3 + self.convert_to_mp3(temp_path, file_path) + logging.info(f"File downloaded and converted to MP3: {file_path}") + except Exception as e: logging.error(f"Download error: {str(e)}", exc_info=True) raise errors.ServiceError("Download failed") - def get( - self, - url: str, - extra_info: Optional[Dict[str, Any]] = None, - process: bool = False, - ) -> List[Track]: + def get(self, url: str, extra_info: Optional[Dict[str, Any]] = None, process: bool = False) -> List[Track]: if not (url or extra_info): raise errors.InvalidArgumentError() @@ -188,4 +197,4 @@ class YtService(_Service): raise errors.NothingFoundError(f"YouTube search failed: {str(e)}") except Exception as e: logging.error(f"Unexpected error in search: {str(e)}", exc_info=True) - raise errors.ServiceError(f"Search failed: {str(e)}") \ No newline at end of file + raise errors.ServiceError(f"Search failed: {str(e)}") diff --git a/changelog.txt b/changelog.txt index bc35152..6a1a79e 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,5 +1,6 @@ This change log is written to find out the changes that have been made by Pandora, and the source code still refers to TTMediaBot. 5/5/2025 +Change the download from m4a to mp3, if it's work. Added new command: e, to add either link or new song to the extended track. if nothing is playing, it plays the track directly. Fix user rights to upload file, hopefully... diff --git a/requirements.txt b/requirements.txt index 291aeea..74c6971 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ google-api-python-client yt-dlp crontab humanize == 4.6 +ffmpeg-python \ No newline at end of file