from __future__ import annotations import html import logging import time from typing import Any, Dict, Callable, List, Optional, TYPE_CHECKING import random import mpv from bot import errors from bot.player.enums import Mode, State, TrackType from bot.player.track import Track from bot.sound_devices import SoundDevice, SoundDeviceType if TYPE_CHECKING: from bot import Bot class Player: def __init__(self, bot: Bot): self.config = bot.config.player self.cache = bot.cache self.cache_manager = bot.cache_manager mpv_options = { "demuxer_lavf_o": "http_persistent=false", "demuxer_max_back_bytes": 1048576, "demuxer_max_bytes": 2097152, "video": False, "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36", "ytdl": False, } mpv_options.update(self.config.player_options) try: self._player = mpv.MPV(**mpv_options, log_handler=self.log_handler) except AttributeError: del mpv_options["demuxer_max_back_bytes"] self._player = mpv.MPV(**mpv_options, log_handler=self.log_handler) self._log_level = 5 self.track_list: List[Track] = [] self.track: Track = Track() self.track_index: int = -1 self.state = State.Stopped self.mode = Mode.TrackList self.volume = self.config.default_volume self.manual_queue = False self.eq_presets = { 'off': '', # Untuk mematikan EQ 'default': 'equalizer=gains=[0 0 0 0 0 0 0 0 0 0]', 'rock': 'equalizer=gains=[4 2 1 -2 -3 -1 2 4 5 5]', 'pop': 'equalizer=gains=[-1 1 3 4 3 1 -1 -1 -2 -2]', 'jazz': 'equalizer=gains=[3 2 1 -1 -2 -2 1 3 4 4]', 'classic': 'equalizer=gains=[-2 -2 -2 -1 1 3 4 4 5 5]', 'bass': 'equalizer=gains=[6 5 4 2 1 0 0 0 0 0]' } self.current_eq = 'off' self.speed_level = 0 # 0 is normal, positive is faster, negative is slower self.is_8d_enabled = False def initialize(self) -> None: logging.debug("Initializing player") logging.debug("Player initialized") def run(self) -> None: logging.debug("Registering player callbacks") self.register_event_callback("end-file", self.on_end_file) self._player.observe_property("metadata", self.on_metadata_update) self._player.observe_property("media-title", self.on_metadata_update) logging.debug("Player callbacks registered") def set_eq(self, preset_name: str) -> bool: """Sets the audio equalizer preset.""" preset_name = preset_name.lower() if preset_name in self.eq_presets: self.current_eq = preset_name self._update_audio_filters() return True return False def set_8d(self, enable: bool) -> None: """Enables or disables the 8D audio effect.""" self.is_8d_enabled = enable self._update_audio_filters() def enqueue(self, tracks: List[Track]) -> None: """Adds tracks to the queue, clearing the old playlist if needed.""" if self.state == State.Stopped or not self.track_list: self.play(tracks) self.manual_queue = True else: if not self.manual_queue: now_playing_track = self.track_list[self.track_index] self.track_list = [now_playing_track] self.track_index = 0 # Reset index self.manual_queue = True # Activate manual queue mode # Now, just add the new track(s) to the end of the (possibly new) list. self.track_list.extend(tracks) def close(self) -> None: logging.debug("Closing player") if self.state != State.Stopped: self.stop() self._player.terminate() logging.debug("Player closed") def play( self, tracks: Optional[List[Track]] = None, start_track_index: Optional[int] = None, ) -> None: if tracks != None: self.manual_queue = False self.track_list = tracks if not start_track_index and self.mode == Mode.Random: self.shuffle(True) self.track_index = self._index_list[0] self.track = self.track_list[self.track_index] else: self.track_index = start_track_index if start_track_index else 0 self.track = tracks[self.track_index] self._play(self.track.url) else: self._player.pause = False self._player.volume = self.volume self.state = State.Playing def pause(self) -> None: self.state = State.Paused self._player.pause = True def stop(self) -> None: self.state = State.Stopped self._player.stop() self.track_list = [] self.track = Track() self.track_index = -1 def _play(self, arg: str, save_to_recents: bool = True) -> None: if save_to_recents: try: if self.cache.recents[-1] != self.track_list[self.track_index]: self.cache.recents.append( self.track_list[self.track_index].get_raw() ) except: self.cache.recents.append(self.track_list[self.track_index].get_raw()) self.cache_manager.save() self._player.pause = False self._player.play(arg) def next(self) -> None: track_index = self.track_index if len(self.track_list) > 0: if self.mode == Mode.Random: try: track_index = self._index_list[ self._index_list.index(self.track_index) + 1 ] except IndexError: track_index = 0 else: track_index += 1 else: track_index = 0 try: self.play_by_index(track_index) except errors.IncorrectTrackIndexError: if self.mode == Mode.RepeatTrackList: self.play_by_index(0) else: raise errors.NoNextTrackError() def previous(self) -> None: track_index = self.track_index if len(self.track_list) > 0: if self.mode == Mode.Random: try: track_index = self._index_list[ self._index_list.index(self.track_index) - 1 ] except IndexError: track_index = len(self.track_list) - 1 else: if track_index == 0 and self.mode != Mode.RepeatTrackList: raise errors.NoPreviousTrackError else: track_index -= 1 else: track_index = 0 try: self.play_by_index(track_index) except errors.IncorrectTrackIndexError: if self.mode == Mode.RepeatTrackList: self.play_by_index(len(self.track_list) - 1) else: raise errors.NoPreviousTrackError def play_by_index(self, index: int) -> None: if index < len(self.track_list) and index >= (0 - len(self.track_list)): self.track = self.track_list[index] self.track_index = self.track_list.index(self.track) self._play(self.track.url) self.state = State.Playing else: raise errors.IncorrectTrackIndexError() def set_volume(self, volume: int) -> None: volume = volume if volume <= self.config.max_volume else self.config.max_volume self.volume = volume if self.config.volume_fading: n = 1 if self._player.volume < volume else -1 for i in range(int(self._player.volume), volume, n): self._player.volume = i time.sleep(self.config.volume_fading_interval) else: self._player.volume = volume def get_speed(self) -> float: return self._player.speed def set_speed(self, level: int) -> None: """Sets the playback speed and pitch based on a level.""" level = max(-5, min(level, 5)) self.speed_level = level if level == 0: self._player.speed = 1.0 # Cukup panggil update, ia akan menghapus 'scaletempo' secara implisit self._update_audio_filters() else: speed_multiplier = 1.0 + (level * 0.10) self._player.speed = speed_multiplier # Tambahkan filter 'scaletempo' untuk menonaktifkan koreksi pitch bawaan mpv self._update_audio_filters(add_filters=['scaletempo']) def _update_audio_filters(self, add_filters: list = None, remove_filters: list = None): """Builds and applies the audio filter string based on active effects.""" active_filters = [] # 1. Cek Equalizer if self.current_eq != 'off': active_filters.append(self.eq_presets[self.current_eq]) # 2. Cek 8D Audio if self.is_8d_enabled: # Filter 8D. Kecepatan 0.2Hz adalah awal yang baik. active_filters.append('pan=stereo|c0