changed the extention file for youtube download.
Some checks are pending
Build-nightly / docker (push) Waiting to run

This commit is contained in:
Umiko 2025-05-05 05:15:38 +07:00
parent 9762856357
commit 330e2acf48
4 changed files with 215 additions and 13 deletions

191
bot/services/no_yt.py Normal file
View File

@ -0,0 +1,191 @@
from __future__ import annotations
import logging
import time
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
if TYPE_CHECKING:
from bot import Bot
from yt_dlp import YoutubeDL
from yt_dlp.downloader import get_suitable_downloader
from bot.config.models import YtModel
from bot.player.enums import TrackType
from bot.player.track import Track
from bot.services import Service as _Service
from bot import errors
class YtService(_Service):
def __init__(self, bot: Bot, config: YtModel):
self.bot = bot
self.config = config
self.name = "yt"
self.hostnames = []
self.is_enabled = self.config.enabled
self.error_message = ""
self.warning_message = ""
self.help = ""
self.hidden = False
self._last_search = time.time()
self._search_interval = 1.0
self._search_results_cache = {}
self._track_cache = {}
self._cache_timeout = 300
API_KEY = 'AIzaSyAnXGFy067AjtuuySsldXi17ysOEQW_ssw'
try:
self.youtube_api = build("youtube", "v3", developerKey=API_KEY)
except Exception as e:
logging.error(f"Failed to initialize YouTube API: {str(e)}")
self.is_enabled = False
self.error_message = "YouTube API initialization failed"
def initialize(self):
self._ydl_config = {
"skip_download": True,
"format": "m4a/bestaudio/best[protocol!=m3u8_native]/best",
"socket_timeout": 5,
"logger": logging.getLogger("root"),
"cookiefile": "/home/ttbot/data/cookies.txt"
}
def download(self, track: Track, file_path: str) -> None:
try:
info = track.extra_info
if not info:
super().download(track, file_path)
return
with YoutubeDL(self._ydl_config) as ydl:
dl = get_suitable_downloader(info)(ydl, self._ydl_config)
dl.download(file_path, info)
except Exception as e:
logging.error(f"Download error: {str(e)}", exc_info=True)
raise errors.ServiceError("Download failed")
def get(
self,
url: str,
extra_info: Optional[Dict[str, Any]] = None,
process: bool = False,
) -> List[Track]:
if not (url or extra_info):
raise errors.InvalidArgumentError()
cache_key = url or extra_info.get('webpage_url', '') if extra_info else ''
if cache_key in self._track_cache and not process:
cache_time, track = self._track_cache[cache_key]
if time.time() - cache_time < self._cache_timeout:
return [track]
try:
with YoutubeDL(self._ydl_config) as ydl:
if process and extra_info:
info = ydl.extract_info(extra_info.get('webpage_url', url), process=False)
elif not extra_info:
info = ydl.extract_info(url, process=False)
else:
info = extra_info
info_type = info.get("_type")
if info_type == "url" and not info.get("ie_key"):
return self.get(info["url"], process=False)
elif info_type == "playlist":
tracks: List[Track] = []
for entry in info["entries"]:
data = self.get("", extra_info=entry, process=False)
tracks.extend(data)
return tracks
if not process:
track = Track(
service=self.name,
url=info.get('webpage_url', url) or url,
name=info.get('title', ''),
extra_info=info,
format=info.get('ext', ''),
type=TrackType.Dynamic
)
self._track_cache[cache_key] = (time.time(), track)
return [track]
stream = ydl.process_ie_result(info, download=False)
if "url" not in stream:
raise errors.ServiceError("No URL found in stream info")
title = stream.get("title", "")
if stream.get("uploader"):
title += f" - {stream['uploader']}"
track_type = TrackType.Live if stream.get("is_live") else TrackType.Default
track = Track(
service=self.name,
url=stream["url"],
name=title,
format=stream.get("ext", ""),
type=track_type,
extra_info=stream
)
self._track_cache[cache_key] = (time.time(), track)
return [track]
except Exception as e:
logging.error(f"Error getting track info: {str(e)}", exc_info=True)
raise errors.ServiceError(f"Failed to get track info: {str(e)}")
def search(self, query: str) -> List[Track]:
current_time = time.time()
if query in self._search_results_cache:
cache_time, results = self._search_results_cache[query]
if current_time - cache_time < self._cache_timeout:
return results.copy()
time_since_last = current_time - self._last_search
if time_since_last < self._search_interval:
time.sleep(self._search_interval - time_since_last)
try:
search_response = self.youtube_api.search().list(
q=query,
part="snippet",
maxResults=25,
type="video"
).execute()
tracks = []
for search_result in search_response.get("items", []):
if 'videoId' in search_result.get('id', {}):
video_url = f"https://www.youtube.com/watch?v={search_result['id']['videoId']}"
track = Track(
service=self.name,
url=video_url,
name=search_result["snippet"]["title"],
type=TrackType.Dynamic,
extra_info={"webpage_url": video_url, "title": search_result["snippet"]["title"]}
)
self._track_cache[video_url] = (current_time, track.get_raw())
tracks.append(track)
self._search_results_cache[query] = (current_time, tracks.copy())
self._last_search = current_time
if tracks:
return tracks
raise errors.NothingFoundError("No videos found")
except HttpError as e:
logging.error(f"YouTube API error: {str(e)}", exc_info=True)
raise errors.NothingFoundError(f"YouTube search failed: {str(e)}")
except Exception as e:
logging.error(f"Unexpected error in search: {str(e)}", exc_info=True)
raise errors.ServiceError(f"Search failed: {str(e)}")

View File

@ -4,10 +4,6 @@ import time
from typing import Any, Dict, List, Optional, TYPE_CHECKING from typing import Any, Dict, List, Optional, TYPE_CHECKING
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
if TYPE_CHECKING:
from bot import Bot
from yt_dlp import YoutubeDL from yt_dlp import YoutubeDL
from yt_dlp.downloader import get_suitable_downloader from yt_dlp.downloader import get_suitable_downloader
from bot.config.models import YtModel from bot.config.models import YtModel
@ -15,6 +11,10 @@ from bot.player.enums import TrackType
from bot.player.track import Track from bot.player.track import Track
from bot.services import Service as _Service from bot.services import Service as _Service
from bot import errors from bot import errors
import ffmpeg # Import ffmpeg-python untuk konversi MP3
if TYPE_CHECKING:
from bot import Bot
class YtService(_Service): class YtService(_Service):
def __init__(self, bot: Bot, config: YtModel): def __init__(self, bot: Bot, config: YtModel):
@ -45,12 +45,19 @@ class YtService(_Service):
def initialize(self): def initialize(self):
self._ydl_config = { self._ydl_config = {
"skip_download": True, "skip_download": True,
"format": "m4a/bestaudio/best[protocol!=m3u8_native]/best", "format": "bestaudio/best[protocol!=m3u8_native]/best", # Ganti ke format terbaik audio
"socket_timeout": 5, "socket_timeout": 5,
"logger": logging.getLogger("root"), "logger": logging.getLogger("root"),
"cookiefile": "/home/ttbot/data/cookies.txt" "cookiefile": "/home/ttbot/data/cookies.txt"
} }
def convert_to_mp3(self, input_path: str, output_path: str):
try:
ffmpeg.input(input_path).output(output_path, audio_bitrate='192k').run()
except Exception as e:
logging.error(f"Error converting to MP3: {str(e)}", exc_info=True)
raise errors.ServiceError("Failed to convert to MP3")
def download(self, track: Track, file_path: str) -> None: def download(self, track: Track, file_path: str) -> None:
try: try:
info = track.extra_info info = track.extra_info
@ -60,17 +67,19 @@ class YtService(_Service):
with YoutubeDL(self._ydl_config) as ydl: with YoutubeDL(self._ydl_config) as ydl:
dl = get_suitable_downloader(info)(ydl, self._ydl_config) dl = get_suitable_downloader(info)(ydl, self._ydl_config)
dl.download(file_path, info) # Mengunduh audio dalam format terbaik
temp_path = file_path.replace('.mp3', '_temp.m4a') # Unduh sementara dalam format m4a
dl.download(temp_path, info)
# Setelah mengunduh file m4a, konversi ke MP3
self.convert_to_mp3(temp_path, file_path)
logging.info(f"File downloaded and converted to MP3: {file_path}")
except Exception as e: except Exception as e:
logging.error(f"Download error: {str(e)}", exc_info=True) logging.error(f"Download error: {str(e)}", exc_info=True)
raise errors.ServiceError("Download failed") raise errors.ServiceError("Download failed")
def get( def get(self, url: str, extra_info: Optional[Dict[str, Any]] = None, process: bool = False) -> List[Track]:
self,
url: str,
extra_info: Optional[Dict[str, Any]] = None,
process: bool = False,
) -> List[Track]:
if not (url or extra_info): if not (url or extra_info):
raise errors.InvalidArgumentError() raise errors.InvalidArgumentError()
@ -188,4 +197,4 @@ class YtService(_Service):
raise errors.NothingFoundError(f"YouTube search failed: {str(e)}") raise errors.NothingFoundError(f"YouTube search failed: {str(e)}")
except Exception as e: except Exception as e:
logging.error(f"Unexpected error in search: {str(e)}", exc_info=True) logging.error(f"Unexpected error in search: {str(e)}", exc_info=True)
raise errors.ServiceError(f"Search failed: {str(e)}") raise errors.ServiceError(f"Search failed: {str(e)}")

View File

@ -1,5 +1,6 @@
This change log is written to find out the changes that have been made by Pandora, and the source code still refers to TTMediaBot. This change log is written to find out the changes that have been made by Pandora, and the source code still refers to TTMediaBot.
5/5/2025 5/5/2025
Change the download from m4a to mp3, if it's work.
Added new command: e, to add either link or new song to the extended track. if nothing is playing, it plays the track directly. Added new command: e, to add either link or new song to the extended track. if nothing is playing, it plays the track directly.
Fix user rights to upload file, hopefully... Fix user rights to upload file, hopefully...

View File

@ -11,3 +11,4 @@ google-api-python-client
yt-dlp yt-dlp
crontab crontab
humanize == 4.6 humanize == 4.6
ffmpeg-python