pushing new changes
Some checks are pending
Build-nightly / docker (push) Waiting to run

This commit is contained in:
Umiko 2025-08-24 16:16:32 +07:00
parent 776fb7da75
commit 5d2065ae85
7 changed files with 126 additions and 238 deletions

View File

@ -2,8 +2,6 @@ from enum import Enum, Flag
import TeamTalkPy import TeamTalkPy
major, minor, patch, build = TeamTalkPy.ttstr(TeamTalkPy.getVersion()).split(".")
class State(Enum): class State(Enum):
NOT_CONNECTED = 0 NOT_CONNECTED = 0
@ -147,7 +145,7 @@ class UserStatusMode(Flag):
N = 4096 N = 4096
class UserRightPre15(Flag): class UserRight(Flag):
Null = TeamTalkPy.UserRight.USERRIGHT_NONE Null = TeamTalkPy.UserRight.USERRIGHT_NONE
MultiLogin = TeamTalkPy.UserRight.USERRIGHT_MULTI_LOGIN MultiLogin = TeamTalkPy.UserRight.USERRIGHT_MULTI_LOGIN
ViewAllUsers = TeamTalkPy.UserRight.USERRIGHT_VIEW_ALL_USERS ViewAllUsers = TeamTalkPy.UserRight.USERRIGHT_VIEW_ALL_USERS
@ -174,75 +172,6 @@ class UserRightPre15(Flag):
ViewHiddenChannels = TeamTalkPy.UserRight.USERRIGHT_VIEW_HIDDEN_CHANNELS ViewHiddenChannels = TeamTalkPy.UserRight.USERRIGHT_VIEW_HIDDEN_CHANNELS
if major == "5" and minor >= "15":
class UserRight15(Flag):
Null = TeamTalkPy.UserRight.USERRIGHT_NONE
MultiLogin = TeamTalkPy.UserRight.USERRIGHT_MULTI_LOGIN
ViewAllUsers = TeamTalkPy.UserRight.USERRIGHT_VIEW_ALL_USERS
CreateTemporaryChannel = TeamTalkPy.UserRight.USERRIGHT_CREATE_TEMPORARY_CHANNEL
ModifyChannels = TeamTalkPy.UserRight.USERRIGHT_MODIFY_CHANNELS
BroadcastTextMessage = TeamTalkPy.UserRight.USERRIGHT_TEXTMESSAGE_BROADCAST
KickUsers = TeamTalkPy.UserRight.USERRIGHT_KICK_USERS
BanUsers = TeamTalkPy.UserRight.USERRIGHT_BAN_USERS
MoveUsers = TeamTalkPy.UserRight.USERRIGHT_MOVE_USERS
OperatorEnable = TeamTalkPy.UserRight.USERRIGHT_OPERATOR_ENABLE
UploadFiles = TeamTalkPy.UserRight.USERRIGHT_UPLOAD_FILES
DownloadFiles = TeamTalkPy.UserRight.USERRIGHT_DOWNLOAD_FILES
UpdateServerProperties = TeamTalkPy.UserRight.USERRIGHT_UPDATE_SERVERPROPERTIES
TransmitVoice = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_VOICE
TransmitVideoCapture = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_VIDEOCAPTURE
TransmitDesktop = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_DESKTOP
TransmitDesktopInput = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_DESKTOPINPUT
TransmitMediaFileAudio = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_MEDIAFILE_AUDIO
TransmitMediaFileVideo = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_MEDIAFILE_VIDEO
TransmitMediaFile = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_MEDIAFILE
LockedNickname = TeamTalkPy.UserRight.USERRIGHT_LOCKED_NICKNAME
LockedStatus = TeamTalkPy.UserRight.USERRIGHT_LOCKED_STATUS
RecordVoice = TeamTalkPy.UserRight.USERRIGHT_RECORD_VOICE
ViewHiddenChannels = TeamTalkPy.UserRight.USERRIGHT_VIEW_HIDDEN_CHANNELS
textMessageUser = TeamTalkPy.UserRight.USERRIGHT_TEXTMESSAGE_USER
textMessageChannel = TeamTalkPy.UserRight.USERRIGHT_TEXTMESSAGE_CHANNEL
UserRight = UserRight15
else:
UserRight = UserRightPre15
class UserRight15(Flag):
Null = TeamTalkPy.UserRight.USERRIGHT_NONE
MultiLogin = TeamTalkPy.UserRight.USERRIGHT_MULTI_LOGIN
ViewAllUsers = TeamTalkPy.UserRight.USERRIGHT_VIEW_ALL_USERS
CreateTemporaryChannel = TeamTalkPy.UserRight.USERRIGHT_CREATE_TEMPORARY_CHANNEL
ModifyChannels = TeamTalkPy.UserRight.USERRIGHT_MODIFY_CHANNELS
BroadcastTextMessage = TeamTalkPy.UserRight.USERRIGHT_TEXTMESSAGE_BROADCAST
KickUsers = TeamTalkPy.UserRight.USERRIGHT_KICK_USERS
BanUsers = TeamTalkPy.UserRight.USERRIGHT_BAN_USERS
MoveUsers = TeamTalkPy.UserRight.USERRIGHT_MOVE_USERS
OperatorEnable = TeamTalkPy.UserRight.USERRIGHT_OPERATOR_ENABLE
UploadFiles = TeamTalkPy.UserRight.USERRIGHT_UPLOAD_FILES
DownloadFiles = TeamTalkPy.UserRight.USERRIGHT_DOWNLOAD_FILES
UpdateServerProperties = TeamTalkPy.UserRight.USERRIGHT_UPDATE_SERVERPROPERTIES
TransmitVoice = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_VOICE
TransmitVideoCapture = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_VIDEOCAPTURE
TransmitDesktop = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_DESKTOP
TransmitDesktopInput = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_DESKTOPINPUT
TransmitMediaFileAudio = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_MEDIAFILE_AUDIO
TransmitMediaFileVideo = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_MEDIAFILE_VIDEO
TransmitMediaFile = TeamTalkPy.UserRight.USERRIGHT_TRANSMIT_MEDIAFILE
LockedNickname = TeamTalkPy.UserRight.USERRIGHT_LOCKED_NICKNAME
LockedStatus = TeamTalkPy.UserRight.USERRIGHT_LOCKED_STATUS
RecordVoice = TeamTalkPy.UserRight.USERRIGHT_RECORD_VOICE
ViewHiddenChannels = TeamTalkPy.UserRight.USERRIGHT_VIEW_HIDDEN_CHANNELS
textMessageUser = TeamTalkPy.UserRight.USERRIGHT_TEXTMESSAGE_USER
textMessageChannel = TeamTalkPy.UserRight.USERRIGHT_TEXTMESSAGE_CHANNEL
if major == "5" and minor >= "15":
UserRight = UserRight15
else:
UserRight = UserRightPre15
class UserAccount: class UserAccount:
def __init__( def __init__(
self, self,

View File

@ -1,75 +1,26 @@
from __future__ import annotations from __future__ import annotations
import os import os
import platform
from typing import Callable, TYPE_CHECKING from typing import Callable, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from bot.translator import Translator from bot.translator import Translator
app_name = "pandorafox♾" app_name = "WalkMan"
app_version = "2.4.1" app_version = "2.3.5"
client_name = app_name + "-V (Version)" + app_version
def get_system_info() -> str:
system = platform.system()
release = platform.release()
version = platform.version()
machine = platform.machine() or "unknown"
processor = platform.processor() or "unknown"
architecture = platform.architecture()[0] # '64bit' or '32bit'
if system == "Windows":
try:
import winreg
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion")
product_name = winreg.QueryValueEx(key, "ProductName")[0]
release_id = winreg.QueryValueEx(key, "ReleaseId")[0]
system_str = f"{product_name} {release_id}"
except Exception:
system_str = f"Windows {release}"
elif system == "Linux":
try:
# Trying to get CPU info from /proc/cpuinfo
with open("/proc/cpuinfo") as f:
cpu_info = f.read()
# Get the processor information
processor = next((line.split(":")[1].strip() for line in cpu_info.splitlines() if "model name" in line), "unknown")
os_info = {}
with open("/etc/os-release") as f:
for line in f:
if "=" in line:
k, v = line.strip().split("=", 1)
os_info[k] = v.strip('"')
name = os_info.get("PRETTY_NAME") or os_info.get("NAME", "Linux")
system_str = name
except Exception:
system_str = f"Linux {release}"
elif system == "Darwin":
mac_ver = platform.mac_ver()[0]
system_str = f"macOS {mac_ver or release}"
else:
system_str = f"{system} {release}"
return f"{system_str} | Arch: {architecture} | Machine: {machine} | CPU: {processor}"
client_name = f"{app_name}-Version{app_version}-{get_system_info()}"
about_text: Callable[[Translator], str] = lambda translator: translator.translate( about_text: Callable[[Translator], str] = lambda translator: translator.translate(
"""\ """\
A media streaming bot for TeamTalk. A media streaming bot for TeamTalk.
Authors: Amir Gumerov, Vladislav Kopylov, Beqa Gozalishvili, Kirill Belousov. Authors: Amir Gumerov, Vladislav Kopylov, Beqa Gozalishvili, Kirill Belousov.
Home page: https://github.com/gumerov-amir/TTMediaBot Home page: https://github.com/gumerov-amir/TTMediaBot
Pandorabox writer: Rexya, Muhammad. Currently Running under: techlabs.lol (Leap over limits!)
Extended from pandoraBox (https://www.dropbox.com/scl/fi/w59od6p43v474cdqfllt1/PandoraBox.zip?rlkey=sghktp7rbuxknbz9b3v9lqfii&dl=1)
Currently maintain by Rafli On Techlabs git
Visit us on: https://git.techlabs.lol/radiant_code/teamtalkbot
License: MIT License\ License: MIT License\
""" """
) )
start_bottt: Callable[[Translator], str] = lambda translator: translator.translate( start_bottt: Callable[[Translator], str] = lambda translator: translator.translate(
"""\ """\
Hello there! Hello there!
I'm PandoraFox, your go-to companion for discovering amazing songs and audio through YouTube, Yandex Music (Currently unavailable), and VK. I'm PandoraBox, your go-to companion for discovering amazing songs and audio through YouTube, Yandex Music, and VK.
Hosted by TechLabsStudio, I'm all set to bring audio magic to your TeamTalk experience. Hosted by TechLabsStudio, I'm all set to bring audio magic to your TeamTalk experience.
To get started, simply send me a private message with a specific command. To get started, simply send me a private message with a specific command.
Here's how you can interact with me: Here's how you can interact with me:
@ -80,17 +31,25 @@ If you encounter any issues or want to chat with us, feel free to reach out.
Thank you for choosing our service, and have a fantastic day!\ Thank you for choosing our service, and have a fantastic day!\
""" """
) )
contacts_bot: Callable[[Translator], str] = lambda translator: translator.translate( contacts_bot: Callable[[Translator], str] = lambda translator: translator.translate(
"""\ """\
If you encounter any issues with this bot, please reach out to our dedicated technicians: If you encounter any issues with this bot, please reach out to our dedicated technicians:
🦊 Rafli:
rafli@techlabs.lol - Muhammad:
t.me/rafli_ir - WhatsApp: https://api.whatsapp.com/send?phone=6282156978782
- Telegram: https://t.me/muha_aku
- Rexya:
- WhatsApp: https://api.whatsapp.com/send?phone=6288222553434
- Email: rexya@infiartt.com
- rafli:
- email: rafli@techlabs.lol
- telegram: rafli_ir
Join the TTMediaBot Official Group on Telegram: https://t.me/TTMediaBot_chat\ Join the TTMediaBot Official Group on Telegram: https://t.me/TTMediaBot_chat\
""" """
) )
fallback_service = "yt" fallback_service = "yt"
loop_timeout = 0.01 loop_timeout = 0.01
max_message_length = 512 max_message_length = 512

View File

@ -12,37 +12,25 @@ import portalocker
if TYPE_CHECKING: if TYPE_CHECKING:
from bot.player.track import Track from bot.player.track import Track
cache_data_type = Dict[str, Any] cache_data_type = Dict[str, Any]
class Cache: class Cache:
def __init__(self, cache_data: cache_data_type): def __init__(self, cache_data: cache_data_type):
self.cache_version = cache_data.get("cache_version", CacheManager.version) self.cache_version = cache_data["cache_version"] if "cache_version" in cache_data else CacheManager.version
self.recents: deque[Track] = deque( self.recents: deque[Track] = (
cache_data.get("recents", []), cache_data["recents"]
maxlen=app_vars.recents_max_lenth if "recents" in cache_data
else deque(maxlen=app_vars.recents_max_lenth)
)
self.favorites: Dict[str, List[Track]] = (
cache_data["favorites"] if "favorites" in cache_data else {}
) )
self.favorites: Dict[str, List[Track]] = cache_data.get("favorites", {})
@property @property
def data(self): def data(self):
# Pastikan semua track di-recents & favorites adalah versi raw (non-stream) return {"cache_version": self.cache_version, "recents": self.recents, "favorites": self.favorites}
sanitized_recents = deque(
(track.get_raw() if hasattr(track, "get_raw") else track)
for track in self.recents
)
sanitized_favorites = {
user: [
track.get_raw() if hasattr(track, "get_raw") else track
for track in tracks
]
for user, tracks in self.favorites.items()
}
return {
"cache_version": self.cache_version,
"recents": sanitized_recents,
"favorites": sanitized_favorites
}
class CacheManager: class CacheManager:

View File

@ -40,7 +40,8 @@ class CommandProcessor:
"contacts": user_commands.ContactsBot, "contacts": user_commands.ContactsBot,
"help": user_commands.HelpCommand, "help": user_commands.HelpCommand,
"p": user_commands.PlayPauseCommand, "p": user_commands.PlayPauseCommand,
"e": user_commands.QueueCommand, "e": user_commands.EnqueueCommand,
"q": user_commands.QueueCommand,
"u": user_commands.PlayUrlCommand, "u": user_commands.PlayUrlCommand,
"sv": user_commands.ServiceCommand, "sv": user_commands.ServiceCommand,
"s": user_commands.StopCommand, "s": user_commands.StopCommand,

View File

@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import List, Optional, TYPE_CHECKING from typing import List, Optional, TYPE_CHECKING
import os import os
import re
from bot.commands.command import Command from bot.commands.command import Command
from bot.player.enums import Mode, State, TrackType from bot.player.enums import Mode, State, TrackType
@ -116,6 +115,51 @@ class PlayUrlCommand(Command):
else: else:
raise errors.InvalidArgumentError raise errors.InvalidArgumentError
class EnqueueCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"QUERY Finds a matching track and adds it to the queue"
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if not arg:
raise errors.InvalidArgumentError
self.run_async(
self.ttclient.send_message,
self.translator.translate("Searching and adding to the queue..."),
user,
)
try:
track_list = self.service_manager.service.search(arg)
# Take only the first track from the search results to enqueue
track_to_queue = [track_list[0]]
# Check if the player was stopped to provide the correct response message
was_stopped = self.player.state == State.Stopped
# Call the enqueue function with the single track
self.run_async(self.player.enqueue, track_to_queue)
# Create the response message
if not was_stopped:
# If already playing, confirm the track was added to the queue
return self.translator.translate("Added to queue: {}").format(
track_to_queue[0].name
)
else:
# If it was stopped, the track will start playing immediately
return self.translator.translate("Playing {}").format(
track_to_queue[0].name
)
except errors.NothingFoundError:
return self.translator.translate("Nothing was found for your query")
except errors.ServiceError:
return self.translator.translate(
"The selected service is currently unavailable"
)
class StopCommand(Command): class StopCommand(Command):
@property @property
@ -583,67 +627,6 @@ class DownloadCommand(Command):
else: else:
return self.translator.translate("Nothing is playing") return self.translator.translate("Nothing is playing")
class QueueCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"QUERY Adds a track to the queue. If no track is playing, plays immediately."
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if not arg:
raise errors.InvalidArgumentError
self.run_async(
self.ttclient.send_message,
self.translator.translate("Searching..."),
user,
)
try:
if re.match(r'http[s]?://', arg):
# Kalau URL, langsung stream
tracks = self.module_manager.streamer.get(arg, user.is_admin if user is not None else True)
self.player.add_to_queue(tracks)
else:
# Kalau bukan URL, cari lagu dari service
tracks = self.service_manager.service.search(arg)
self.player.add_to_queue(tracks)
# Kalau gak lagi main apa-apa, langsung play
if self.player.state != State.Playing:
self.player.play_next()
if self.config.general.send_channel_messages:
if re.match(r'http[s]?://', arg):
message = self.translator.translate(
"{nickname} added a stream URL to the queue."
).format(nickname=user.nickname)
else:
message = self.translator.translate(
"{nickname} added {request} to the queue."
).format(nickname=user.nickname, request=arg)
self.run_async(
self.ttclient.send_message,
message,
type=2,
)
if re.match(r'http[s]?://', arg):
return self.translator.translate("Added stream URL to the queue.")
else:
return self.translator.translate("Added {} to the queue.").format(
tracks[0].name
)
except errors.NothingFoundError:
return self.translator.translate("Nothing is found for your query")
except errors.ServiceError:
return self.translator.translate(
"The selected service is currently unavailable"
)
class ChangeLogCommand(Command): class ChangeLogCommand(Command):
@property @property
def help(self) -> str: def help(self) -> str:
@ -689,3 +672,31 @@ class DefaultSeekStepCommand(Command):
else: else:
return self.translator.translate("Default Seek step can not be blank, Please specify default Seek step!") return self.translator.translate("Default Seek step can not be blank, Please specify default Seek step!")
class QueueCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Displays the current song queue")
def __call__(self, arg: str, user: User) -> Optional[str]:
if not self.player.track_list or self.player.state == State.Stopped:
return self.translator.translate("The queue is empty.")
# Get the currently playing track
now_playing = self.player.track
response = self.translator.translate("Now Playing: {track_name}\n\nQueue:\n").format(track_name=now_playing.name)
# Get the rest of the queue (all tracks after the current one)
queue_list = self.player.track_list[self.player.track_index + 1:]
if not queue_list:
response += self.translator.translate("No more tracks in the queue.")
return response
for i, track in enumerate(queue_list[:10]): # Show max 10 tracks
response += f"{i + 1}. {track.name}\n"
if len(queue_list) > 10:
remaining = len(queue_list) - 10
response += self.translator.translate("\n...and {count} more.").format(count=remaining)
return response

View File

@ -43,6 +43,7 @@ class Player:
self.state = State.Stopped self.state = State.Stopped
self.mode = Mode.TrackList self.mode = Mode.TrackList
self.volume = self.config.default_volume self.volume = self.config.default_volume
self.manual_queue = False
def initialize(self) -> None: def initialize(self) -> None:
logging.debug("Initializing player") logging.debug("Initializing player")
@ -55,6 +56,21 @@ class Player:
self._player.observe_property("media-title", self.on_metadata_update) self._player.observe_property("media-title", self.on_metadata_update)
logging.debug("Player callbacks registered") logging.debug("Player callbacks registered")
def enqueue(self, tracks: List[Track]) -> None:
"""Adds tracks to the queue, clearing the old playlist if needed."""
if self.state == State.Stopped or not self.track_list:
self.play(tracks)
self.manual_queue = True
else:
if not self.manual_queue:
now_playing_track = self.track_list[self.track_index]
self.track_list = [now_playing_track]
self.track_index = 0 # Reset index
self.manual_queue = True # Activate manual queue mode
# Now, just add the new track(s) to the end of the (possibly new) list.
self.track_list.extend(tracks)
def close(self) -> None: def close(self) -> None:
logging.debug("Closing player") logging.debug("Closing player")
if self.state != State.Stopped: if self.state != State.Stopped:
@ -68,6 +84,7 @@ class Player:
start_track_index: Optional[int] = None, start_track_index: Optional[int] = None,
) -> None: ) -> None:
if tracks != None: if tracks != None:
self.manual_queue = False
self.track_list = tracks self.track_list = tracks
if not start_track_index and self.mode == Mode.Random: if not start_track_index and self.mode == Mode.Random:
self.shuffle(True) self.shuffle(True)
@ -105,7 +122,6 @@ class Player:
self.cache_manager.save() self.cache_manager.save()
self._player.pause = False self._player.pause = False
self._player.play(arg) self._player.play(arg)
self._player.volume = self.volume
def next(self) -> None: def next(self) -> None:
track_index = self.track_index track_index = self.track_index
@ -157,7 +173,7 @@ class Player:
def play_by_index(self, index: int) -> None: def play_by_index(self, index: int) -> None:
if index < len(self.track_list) and index >= (0 - len(self.track_list)): if index < len(self.track_list) and index >= (0 - len(self.track_list)):
self.track = self.track_list[index] self.track = self.track_list[index]
self.track_index = index self.track_index = self.track_list.index(self.track)
self._play(self.track.url) self._play(self.track.url)
self.state = State.Playing self.state = State.Playing
else: else:
@ -182,25 +198,6 @@ class Player:
raise ValueError() raise ValueError()
self._player.speed = arg self._player.speed = arg
def add_to_queue(self, tracks: List[Track]) -> None:
"""Adds tracks to the queue."""
self.track_list.extend(tracks)
logging.debug(f"Added {len(tracks)} track(s) to the queue.")
# If nothing is playing, start playing the next track
if self.state == State.Stopped and len(self.track_list) > 0:
self.play_next()
def play_next(self) -> None:
"""Play the next track in the queue."""
if len(self.track_list) > 0:
self.track_index = 0 # Start with the first track
self.track = self.track_list[self.track_index]
self._play(self.track.url)
self.state = State.Playing
else:
self.state = State.Stopped
def seek_back(self, step: Optional[float] = None) -> None: def seek_back(self, step: Optional[float] = None) -> None:
step = step if step else self.config.seek_step step = step if step else self.config.seek_step
if step <= 0: if step <= 0:

View File

@ -1,6 +1,9 @@
This change log is written to find out the changes that have been made by Pandora, and the source code still refers to TTMediaBot. This change log is written to find out the changes that have been made by Pandora, and the source code still refers to TTMediaBot.
24-08-2025
1. Added queue.
5/17/2025 5/17/2025
rollback to TTSDk5.12 due 5.8 is gone. rollback to TTSDk5.12 due to 5.8 is gone.
5/14/2025 5/14/2025
Added entry for scheduler command on readme.md Added entry for scheduler command on readme.md
Fixed file caching: Fixed file caching: