teamtalkbot/bot/commands/admin_commands.py
Umiko f331b10358
Some checks are pending
Build-nightly / docker (push) Waiting to run
first commit.
2025-04-13 14:44:48 +07:00

890 lines
33 KiB
Python

from __future__ import annotations
import logging
import os
import subprocess
import sys
import time
from typing import Optional, TYPE_CHECKING
from queue import Empty
import humanize
from crontab import CronTab
from bot.commands.command import Command
from bot.config.models import CronEntryModel
from bot.player.enums import State
from bot import app_vars, errors
if TYPE_CHECKING:
from bot.TeamTalk.structs import User
class BlockCommandCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"+/-COMMAND Block or unblock command. +COMMAND add command to the block list. -COMMAND remove from it. Without a command shows the block list."
)
def __call__(self, arg: str, user: User) -> Optional[str]:
arg = arg.lower()
if len(arg) >= 1 and arg[1:] not in self.command_processor.commands_dict:
raise errors.InvalidArgumentError()
if not arg:
return (
", ".join(self.config.general.blocked_commands)
if self.config.general.blocked_commands
else self.translator.translate("the list is empty")
)
if arg[0] == "+":
if arg[1::] not in self.config.general.blocked_commands:
self.config.general.blocked_commands.append(arg[1::])
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"command \"{command}\" was blocked."
).format(command=arg[1::]),
user,
)
self.config_manager.save()
else:
self.translator.translate("This command is already added")
elif arg[0] == "-":
if arg[1::] in self.config.general.blocked_commands:
del self.config.general.blocked_commands[
self.config.general.blocked_commands.index(arg[1::])
]
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"command \"{command}\" was unblocked."
).format(command=arg[1::]),
user,
)
self.config_manager.save()
else:
return self.translator.translate("This command is not blocked")
else:
raise errors.InvalidArgumentError()
class ChangeGenderCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"GENDER Changes bot's gender. n neutral, m male, f female"
)
def __call__(self, arg: str, user: User) -> Optional[str]:
try:
self.ttclient.change_gender(arg)
self.config.teamtalk.gender = arg
except KeyError:
raise errors.InvalidArgumentError()
self.config_manager.save()
class ChangeLanguageCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("LANGUAGE change player language")
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
try:
self.translator.set_locale(arg)
self.config.general.language = arg
self.ttclient.change_status_text("")
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"the language has been changed to {language}."
).format(language=arg),
user,
)
self.config_manager.save()
except errors.LocaleNotFoundError:
return self.translator.translate("Incorrect language")
else:
return self.translator.translate(
"Current language: {current_language}. Available languages: {available_languages}."
).format(
current_language=self.translator.get_locale(),
available_languages=", ".join(self.translator.get_locales()),
)
class ChangeNicknameCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("NICKNAME Changes bot's nickname")
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.ttclient.change_nickname(arg)
self.config.teamtalk.nickname = arg
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"nickname change to {newname}."
).format(newname=arg),
user,
)
else:
self.ttclient.change_nickname(app_vars.client_name)
self.config.teamtalk.nickname = app_vars.client_name
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"nickname set to default client ({newname})."
).format(newname=app_vars.client_name),
user,
)
self.config_manager.save()
class ClearCacheCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"r/f Clears bot's cache. r clears recents, f clears favorites, without an option clears the entire cache"
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if not arg:
self.cache.recents.clear()
self.cache.favorites.clear()
self.cache_manager.save()
return self.translator.translate("Cache cleared")
elif arg == "r":
self.cache.recents.clear()
self.cache_manager.save()
return self.translator.translate("Recents cleared")
elif arg == "f":
self.cache.favorites.clear()
self.cache_manager.save()
return self.translator.translate("Favorites cleared")
class ShowDefaultServerCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Read config file and show default Server, Port, Username, Password, Channel, Channel password.")
def __call__(self, arg: str, user: User) -> Optional[str]:
return self.translator.translate(
"Server: {hostname}\n"
"TCP Port: {tcpport}\n"
"UDP Port: {udpport}\n"
"Username: {username}\n"
"Password: {password}\n"
"Channel: {channel}\n"
"Channel password: {channel_password}"
).format(
hostname=self.config.teamtalk.hostname,
tcpport=self.config.teamtalk.tcp_port,
udpport=self.config.teamtalk.udp_port,
username=self.config.teamtalk.username,
password=self.config.teamtalk.password,
channel=self.config.teamtalk.channel,
channel_password=self.config.teamtalk.channel_password,
)
class DefaultServerCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default Server information.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.config.teamtalk.hostname = arg
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default Server change to {newhostname}."
).format(newhostname=arg),
user,
)
self.config_manager.save()
else:
return self.translator.translate("Default server can not be blank, Please specify default Server information!")
class DefaultTCPPortCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default TCP port information.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
try:
tcpport = int(arg)
self.config.teamtalk.tcp_port = int(arg)
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default TCP port change to {newtcp_port}."
).format(newtcp_port=int(arg)),
user,
)
self.config_manager.save()
except ValueError:
raise errors.InvalidArgumentError
else:
return self.translator.translate("Default TCP port can not be blank, Please specify default TCP port!")
class DefaultUDPPortCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default UDP port information.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
try:
udpport = int(arg)
self.config.teamtalk.udp_port = int(arg)
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default TCP port change to {newudp_port}."
).format(newudp_port=int(arg)),
user,
)
self.config_manager.save()
except ValueError:
raise errors.InvalidArgumentError
else:
return self.translator.translate("Default UDP port can not be blank, Please specify default UDP port!")
class DefaultUsernameCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default Username information. Without any arguments, set default Username to blank.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.config.teamtalk.username = arg
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default Username change to {newusername}."
).format(newusername=arg),
user,
)
self.config_manager.save()
else:
self.config.teamtalk.username = ""
self.config_manager.save()
return self.translator.translate("Default username set to blank.")
class DefaultPasswordCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default Password information. Without any arguments, set default password to blank.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.config.teamtalk.password = arg
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default Password change to {newpassword}."
).format(newpassword=arg),
user,
)
self.config_manager.save()
else:
self.config.teamtalk.password = ""
self.config_manager.save()
return self.translator.translate("Default Password set to blank.")
class DefaultChannelCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default Channel information. Without any arguments, set default Channel to current channel.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.config.teamtalk.channel = arg
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default Channel change to {newchannel}, if your channel has a password, please set it manually!"
).format(newchannel=arg),
user,
)
self.config_manager.save()
else:
channel_id = str(self.ttclient.channel.id)
self.config.teamtalk.channel = channel_id
self.config_manager.save()
return self.translator.translate("Default Channel set to current channel, if your channel has a password, please set it manually!")
class DefaultChannelPasswordCommand(Command):
@property
def help(self):
return self.translator.translate(
'Change default Channel password information. Without any arguments, set default Channel password to blank..'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.config.teamtalk.channel_password = arg
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"Default Channel password change to {newchannel_password}."
).format(newchannel_password=arg),
user,
)
self.config_manager.save()
else:
self.config.teamtalk.channel_password = ""
self.config_manager.save()
return self.translator.translate("Default Channel password set to blank.")
class GetChannelIDCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Get current channel ID, will be useful for several things.")
def __call__(self, arg: str, user: User) -> Optional[str]:
return str(self.ttclient.channel.id)
class JoinChannelCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
'Join channel. first argument is channel name or id, second argument is password, split argument " | ", if password is undefined, don\'t type second argument'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
args = self.command_processor.split_arg(arg)
if not arg:
channel = self.config.teamtalk.channel
password = self.config.teamtalk.channel_password
elif len(args) == 2:
channel = args[0]
password = args[1]
else:
channel = arg
password = ""
if isinstance(channel, str) and channel.isdigit():
channel = int(channel)
try:
cmd = self.ttclient.join_channel(channel, password)
except ValueError:
return self.translator.translate("This channel does not exist")
while True:
try:
event = self.ttclient.event_success_queue.get_nowait()
if event.source == cmd:
break
else:
self.ttclient.event_success_queue.put(event)
except Empty:
pass
try:
error = self.ttclient.errors_queue.get_nowait()
if error.command_id == cmd:
return self.translator.translate(
"Error joining channel: {error}".format(error=error.message)
)
else:
self.ttclient.errors_queue.put(error)
except Empty:
pass
time.sleep(app_vars.loop_timeout)
""" class CreateChannelCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
'Create channel. first argument is channel name, second argument is parent channel ID, third argument is the channel topic, fourth argument is the channel password. Split arguments " | ", if password or topic is undefined, don\'t type fourth or third argument.'
)
def __call__(self, arg: str, user: User) -> Optional[str]:
args = self.command_processor.split_arg(arg)
if len(args) == 4:
channel_name = args[0]
parent_channel_id = int(args[1])
channel_topic = args[2]
channel_password = args[3]
elif len(args) == 3:
channel_name = args[0]
parent_channel_id = int(args[1])
channel_topic = args[2]
channel_password = ""
elif len(args) == 2:
channel_name = args[0]
parent_channel_id = int(args[1])
channel_topic = ""
channel_password = ""
else:
return self.help
"""
""" class TaskSchedulerCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Task scheduler")
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg[0] == "+":
self._add(arg[1::])
def _add(self, arg: str) -> None:
args = arg.split("|")
timestamp = self._get_timestamp(args[0])
task = []
for arg in args[1::]:
try:
command, arg = self.parse_command(message.text)
if self.check_access(message.user, command):
command = self.get_command(command, message.user)
task.append((command, arg))
except errors.AccessDeniedError as e:
return e
except (errors.ParseCommandError, errors.UnknownCommandError):
return self.translator.translate("Unknown command. Send \"start\" for help.")
except errors.InvalidArgumentError:
return self.help(command, message.user)
if timestamp in self.module_manager.task_scheduler.tasks:
self.module_manager.task_scheduler[timestamp].append(task)
else:
self.module_manager.task_scheduler.tasks[timestamp] = [task,]
def _get_timestamp(self, t):
return int(datetime.combine(datetime.today(), datetime.strptime(t, self.config["general"]["time_format"]).time()).timestamp())
"""
class VoiceTransmissionCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Enables or disables voice transmission")
def __call__(self, arg: str, user: User) -> Optional[str]:
if not self.ttclient.is_voice_transmission_enabled:
self.ttclient.enable_voice_transmission()
if self.player.state == State.Stopped:
self.ttclient.change_status_text(
self.translator.translate("Voice transmission enabled")
)
return self.translator.translate("Voice transmission enabled")
else:
self.ttclient.disable_voice_transmission()
if self.player.state == State.Stopped:
self.ttclient.change_status_text("")
return self.translator.translate("Voice transmission disabled")
class LockCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Locks or unlocks the bot")
def __call__(self, arg: str, user: User) -> Optional[str]:
self.command_processor.locked = not self.command_processor.locked
if self.command_processor.locked:
self.config.general.bot_lock = True
self.run_async(
self.ttclient.send_message,
self.translator.translate("Bot is now locked by administrator."),
user,
)
else:
self.config.general.bot_lock = False
self.run_async(
self.ttclient.send_message,
self.translator.translate("Bot is now unlocked by administrator."),
user,
)
self.config_manager.save()
return None
def on_startup(self) -> None:
self.command_processor.locked = self.config.general.bot_lock
class ChangeStatusCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("STATUS Changes bot's status")
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
self.ttclient.change_status_text(arg)
self.config.teamtalk.status = self.ttclient.status
self.run_async(
self.ttclient.send_message,
self.translator.translate("Status changed."),
user,
)
else:
self.config.teamtalk.status = ""
self.ttclient.change_status_text("")
self.run_async(
self.ttclient.send_message,
self.translator.translate("Status change to default."),
user,
)
self.config_manager.save()
class EventHandlingCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Enables or disables event handling")
def __call__(self, arg: str, user: User) -> Optional[str]:
self.config.teamtalk.event_handling.load_event_handlers = (
not self.config.teamtalk.event_handling.load_event_handlers
)
return (
self.translator.translate("Event handling is enabled")
if self.config.teamtalk.event_handling.load_event_handlers
else self.translator.translate("Event handling is disabled")
)
class SendChannelMessages(Command):
@property
def help(self) -> str:
return self.translator.translate(
"Send your message to current channel. Without any argument, turn or off Channel messages."
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
if self.config.general.send_channel_messages:
self.ttclient.send_message(
self .translator.translate("{message}.\nSender: {nickname}.").format(
message=arg, nickname=user.nickname),
type=2
)
elif not self.config.general.send_channel_messages:
return self.translator.translate(
"Please enable channel messages first"
)
else:
self.config.general.send_channel_messages = (not self.config.general.send_channel_messages)
return (self.translator.translate("Channel messages enabled")
if self.config.general.send_channel_messages
else self.translator.translate("Channel messages disabled")
)
self.config_manager.save()
class SendBroadcastMessages(Command):
@property
def help(self) -> str:
return self.translator.translate(
"Send broadcast message to all users. Without any argument, turn on or off Broadcast messages"
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
if self.config.general.send_broadcast_messages:
self.ttclient.send_message(
self .translator.translate("{message}.\nSender: {nickname}.").format(
message=arg, nickname=user.nickname),
type=3
)
elif not self.config.general.send_broadcast_messages:
return self.translator.translate(
"Please enable broadcast messages first"
)
else:
self.config.general.send_broadcast_messages = (not self.config.general.send_broadcast_messages)
return (self.translator.translate("Broadcast messages enabled")
if self.config.general.send_broadcast_messages
else self.translator.translate("Broadcast messages disabled")
)
self.config_manager.save()
class SaveConfigCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Saves bot's configuration")
def __call__(self, arg: str, user: User) -> Optional[str]:
self.config_manager.save()
return self.translator.translate("Configuration saved")
class AdminUsersCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"+/-USERNAME Manage the list of administrators. +USERNAME add a user. -USERNAME remove it. Without an option show the list."
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
if arg[0] == "+":
self.config.teamtalk.users.admins.append(arg[1::])
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"{username} is now admin in this player."
).format(username=arg[1::]),
user,
)
self.config_manager.save()
elif arg[0] == "-":
try:
del self.config.teamtalk.users.admins[
self.config.teamtalk.users.admins.index(arg[1::])
]
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"{username} no longer admin in this player."
).format(username=arg[1::]),
user,
)
self.config_manager.save()
except ValueError:
return self.translator.translate(
"this user is not in the admin list"
)
else:
admins = self.config.teamtalk.users.admins.copy()
if len(admins) > 0:
if "" in admins:
admins[admins.index("")] = "<Anonymous>"
return ", ".join(admins)
else:
return self.translator.translate("the list is empty")
class BannedUsersCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"+/-USERNAME Manage the list of banned users. +USERNAME add a user. -USERNAME remove it. Without an option show the list."
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg:
if arg[0] == "+":
self.config.teamtalk.users.banned_users.append(arg[1::])
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"{username} now banned"
).format(username=arg[1::], nickname=user.nickname),
user,
)
self.config_manager.save()
elif arg[0] == "-":
try:
del self.config.teamtalk.users.banned_users[
self.config.teamtalk.users.banned_users.index(arg[1::])
]
self.run_async(
self.ttclient.send_message,
self.translator.translate(
"{username} now unbanned."
).format(username=arg[1::]),
user,
)
self.config_manager.save()
except ValueError:
return self.translator.translate("this user is not banned")
else:
banned_users = self.config.teamtalk.users.banned_users.copy()
if len(banned_users) > 0:
if "" in banned_users:
banned_users[banned_users.index("")] = "<Anonymous>"
return ", ".join(banned_users)
else:
return self.translator.translate("the list is empty")
class QuitCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Quits the bot")
def __call__(self, arg: str, user: User) -> Optional[str]:
self.config_manager.save()
if self.player.state != State.Stopped:
volume = int(0)
if 0 <= volume <= self.config.player.max_volume:
self.player.set_volume(int(0))
self._bot.close()
class RestartCommand(Command):
@property
def help(self) -> str:
return self.translator.translate("Restarts the bot")
def __call__(self, arg: str, user: User) -> Optional[str]:
self.config_manager.save()
if self.player.state != State.Stopped:
volume = int(0)
if 0 <= volume <= self.config.player.max_volume:
self.player.set_volume(int(0))
self._bot.close()
args = sys.argv
if sys.platform == "win32":
subprocess.run([sys.executable] + args)
else:
args.insert(0, sys.executable)
os.execv(sys.executable, args)
class SchedulerCommand(Command):
@property
def help(self) -> str:
return self.translator.translate(
"Controls the cron scheduler, allowing admins to toggle it and add / remove scheduled commands"
)
def __call__(self, arg: str, user: User) -> Optional[str]:
if arg == "toggle" or arg == "t":
return self.toggle()
elif arg.startswith("add "):
# pass in the string after our command:
add_argstr = arg[4:]
return self.add(add_argstr)
elif arg.startswith("rm "):
return self.remove(arg[3:])
elif arg.strip() == "list" or arg.strip() == "ls":
return self.list_tasks()
elif arg.strip() in ["help", "?", "h"]:
return """Unknown cr subcommand. Options:
toggle (shorten to t) - turn the scheduler on or off.
add - add a cron task.
Format is cron expression|command with arguments.
Remove - remove a task.
list or ls - list all scheduled tasks."""
else:
s: str = (
self.translator.translate(
"Scheduled tasks are enabled (disable with 'cr toggle')"
)
if self.config.schedule.enabled
else self.translator.translate(
"Scheduled tasks are disabled; enable with 'cr toggle'"
)
)
s += "\n" + self.list_tasks()
s += self.translator.translate(
"Commands: \ncr add cron expression|command\ncr rm #\ncr ls\ncr toggle"
)
return s
def toggle(self):
self.config.schedule.enabled = not self.config.schedule.enabled
if self.config.schedule.enabled:
self.reparse_patterns()
return (
self.translator.translate("Scheduler enabled.")
if self.config.schedule.enabled
else self.translator.translate("Scheduler disabled")
)
def reparse_patterns(self):
self._bot.cron_patterns = []
for entry in self.config.schedule.patterns:
logging.debug(
f"Parsing cron pattern '{entry.pattern}' and appending to list"
)
e = CronTab(entry.pattern)
self._bot.cron_patterns.append((e, entry))
def add(self, argstr: str) -> str:
# Our arg should be a cron expression, | and the command.
help_text = self.translator.translate(
"Incorrect format. Cron expression | command you want to run with arguments after"
)
if "|" not in argstr:
return help_text
args = argstr.split("|")
if len(args) != 2:
return help_text
cronexpr = args[0].strip()
cmd = args[1].strip()
try:
ct: CronTab = CronTab(cronexpr)
entry = CronEntryModel(pattern=cronexpr, command=cmd)
self.config.schedule.patterns.append(entry)
self.reparse_patterns()
return self.translator.translate("Task scheduled.")
except ValueError:
return self.translator.translate(
"Not a valid cron expression. Pleaes use cr expression-help for more details."
)
def remove(self, arg: str) -> str:
if arg == "":
return self.list_tasks()
try:
task_index = int(arg)
task_index -= 1
if task_index > len(self.config.schedule.patterns) or task_index < 0:
return self.translator.translate(
"Task number out of range - should be 1 to {}".format(len(l))
)
task = self.config.schedule.patterns.pop(task_index)
self.reparse_patterns()
return self.translator.translate(
f"Removed task #{task_index+1}, {task.pattern}: {task.command}"
)
except ValueError:
return self.translator.translate("Invalid task number")
def list_tasks(self) -> str:
if len(self.config.schedule.patterns) == 0:
return self.translator.translate("There are no scheduled tasks")
lines: list[str] = []
for idx, task in enumerate(self.config.schedule.patterns):
lines.append("Task #{}".format(idx + 1))
lines.append(" Cron pattern: {}".format(task.pattern))
lines.append(" command: {}".format(task.command))
c = CronTab(task.pattern)
lines.append(
"Pattern ran: {} ago;".format(
humanize.precisedelta(c.previous(default_utc=False))
)
)
lines.append(
"Next run in {}.".format(
humanize.precisedelta(c.next(default_utc=False))
)
)
s: str = ""
for l in lines:
s += l + "\n"
return s
return lines