teamtalkbot/bot/TeamTalk/__init__.py
Umiko f331b10358
Some checks are pending
Build-nightly / docker (push) Waiting to run
first commit.
2025-04-13 14:44:48 +07:00

387 lines
13 KiB
Python

from __future__ import annotations
import logging
import os
import re
import sys
from typing import AnyStr, List, TYPE_CHECKING, Optional, Union
from queue import Queue
from bot import app_vars
from bot.sound_devices import SoundDevice, SoundDeviceType
if sys.platform == "win32":
if sys.version_info.major == 3 and sys.version_info.minor >= 8:
os.add_dll_directory(app_vars.directory)
else:
os.chdir(app_vars.directory)
from bot.TeamTalk.thread import TeamTalkThread
from bot.TeamTalk.structs import *
import TeamTalkPy
re_line_endings = re.compile("[\\r\\n]")
if TYPE_CHECKING:
from bot import Bot
def _str(data: AnyStr) -> AnyStr:
if isinstance(data, str):
if os.supports_bytes_environ:
return bytes(data, "utf-8")
else:
return data
else:
return str(data, "utf-8")
def split(text: str, max_length: int = app_vars.max_message_length) -> List[str]:
if len(text) <= max_length:
lines = [text]
else:
lines = [""]
for line in text.split("\n"):
if len(line) <= max_length:
if len(lines[-1]) > 0 and len(lines[-1]) + len(line) + 1 <= max_length:
lines[-1] += "\n" + line
elif len(lines) == 1 and len(lines[0]) == 0:
lines[0] = line
else:
lines.append(line)
else:
words = [""]
for word in line.split(" "):
if len(word) <= max_length:
if (
len(words[-1]) > 0
and len(words[-1]) + len(word) + 1 <= max_length
):
words[-1] += " " + word
elif len(words) == 1 and len(words[0]) == 0:
words[0] == word
else:
words.append(word)
else:
chunk = word
for _ in range(0, int(len(chunk) / max_length) + 1):
words.append(chunk[0:max_length])
chunk = chunk[max_length::]
lines += words
return lines
class TeamTalk:
def __init__(self, bot: Bot) -> None:
self.config = bot.config.teamtalk
self.translator = bot.translator
TeamTalkPy.setLicense(
_str(self.config.license_name), _str(self.config.license_key)
)
self.tt = TeamTalkPy.TeamTalk()
self.state = State.NOT_CONNECTED
self.is_voice_transmission_enabled = False
self.nickname = self.config.nickname
self.gender = UserStatusMode.__members__[self.config.gender.upper()]
self.status = self.default_status
self.errors_queue: Queue[Error] = Queue()
self.event_success_queue: Queue[Event] = Queue()
self.message_queue: Queue[Message] = Queue()
self.myself_event_queue: Queue[Event] = Queue()
self.uploaded_files_queue: Queue[File] = Queue()
self.thread = TeamTalkThread(bot, self)
self.reconnect = False
self.reconnect_attempt = 0
self.user_account: UserAccount
def initialize(self) -> None:
logging.debug("Initializing TeamTalk")
self.thread.start()
self.connect()
logging.debug("TeamTalk initialized")
def close(self) -> None:
logging.debug("Closing teamtalk")
self.thread.close()
self.disconnect()
self.state = State.NOT_CONNECTED
self.tt.closeTeamTalk()
logging.debug("Teamtalk closed")
def connect(self) -> None:
self.state = State.CONNECTING
self.tt.connect(
_str(self.config.hostname),
self.config.tcp_port,
self.config.udp_port,
0,
0,
self.config.encrypted,
)
def disconnect(self) -> None:
self.tt.disconnect()
self.state = State.NOT_CONNECTED
def login(self) -> None:
self.tt.doLogin(
_str(self.config.nickname),
_str(self.config.username),
_str(self.config.password),
_str(app_vars.client_name),
)
def join(self) -> None:
if isinstance(self.config.channel, int):
channel_id = int(self.config.channel)
else:
channel_id = self.tt.getChannelIDFromPath(_str(self.config.channel))
if channel_id == 0:
channel_id = 1
self.tt.doJoinChannelByID(channel_id, _str(self.config.channel_password))
@property
def default_status(self) -> str:
if self.config.status:
return self.config.status
else:
return self.translator.translate('PM "start" to get short help!')
def send_message(
self, text: str, user: Optional[User] = None, type: int = 1
) -> None:
for string in split(text):
message = TeamTalkPy.TextMessage()
message.nFromUserID = self.tt.getMyUserID()
message.nMsgType = type
message.szMessage = _str(string)
if type == 1:
if isinstance(user, int):
message.nToUserID = user
else:
message.nToUserID = user.id
elif type == 2:
message.nChannelID = self.tt.getMyChannelID()
self.tt.doTextMessage(message)
def send_file(self, channel: Union[int, str], file_path: str):
if isinstance(channel, int):
channel_id = channel
else:
channel_id = self.tt.getChannelIDFromPath(_str(channel))
if channel_id == 0:
raise ValueError()
return self.tt.doSendFile(channel_id, _str(file_path))
def delete_file(self, channel: Union[int, str], file_id: int) -> int:
if isinstance(channel, int):
channel_id = channel
else:
channel_id = self.tt.getChannelIDFromPath(_str(channel))
if channel_id == 0 or file_id == 0:
raise ValueError()
return self.tt.doDeleteFile(channel_id, file_id)
def join_channel(self, channel: Union[str, int], password: str) -> int:
if isinstance(channel, int):
channel_id = channel
else:
channel_id = self.tt.getChannelIDFromPath(_str(channel))
if channel_id == 0:
raise ValueError()
return self.tt.doJoinChannelByID(channel_id, _str(password))
def change_nickname(self, nickname: str) -> None:
self.tt.doChangeNickname(_str(nickname))
def change_status_text(self, text: str) -> None:
if text:
self.status = split(text)[0]
else:
self.status = split(self.default_status)[0]
self.tt.doChangeStatus(self.gender.value, _str(self.status))
def change_gender(self, gender: str) -> None:
self.gender = UserStatusMode.__members__[gender.upper()]
self.tt.doChangeStatus(self.gender.value, _str(self.status))
def get_channel(self, channel_id: int) -> Channel:
channel = self.tt.getChannel(channel_id)
return self.get_channel_from_obj(channel)
def get_channel_from_obj(self, obj: TeamTalkPy.Channel) -> Channel:
try:
return Channel(
obj.nChannelID,
obj.szName,
obj.szTopic,
obj.nMaxUsers,
ChannelType(obj.uChannelType),
)
except ValueError:
return Channel(0, "", "", 0, ChannelType.Default)
@property
def flags(self) -> Flags:
return Flags(self.tt.getFlags())
def get_error(self, error_no: int, cmdid: int) -> Error:
try:
error_type = ErrorType(error_no)
except ValueError:
error_type = ErrorType(0)
return Error(_str(self.tt.getErrorMessage(error_no)), error_type, cmdid)
def get_message(self, msg: TeamTalkPy.TextMessage) -> Message:
try:
return Message(
re.sub(re_line_endings, "", _str(msg.szMessage)),
self.get_user(msg.nFromUserID),
self.get_channel(msg.nChannelID),
MessageType(msg.nMsgType),
)
except ValueError:
return Message("", self.get_user(1), self.get_channel(1), MessageType.User)
def get_file(self, file: TeamTalkPy.RemoteFile) -> File:
return File(
file.nFileID,
_str(file.szFileName),
self.get_channel(file.nChannelID),
file.nFileSize,
_str(file.szUsername),
)
@property
def user(self) -> User:
user = self.get_user(self.tt.getMyUserID())
user.user_account = self.user_account
return user
@property
def channel(self) -> Channel:
return self.get_channel(self.tt.getMyChannelID())
def get_user(self, id: int) -> User:
user = self.tt.getUser(id)
gender = UserStatusMode(user.nStatusMode)
return User(
user.nUserID,
_str(user.szNickname),
_str(user.szUsername),
_str(user.szStatusMsg),
gender,
UserState(user.uUserState),
self.get_channel(user.nChannelID),
_str(user.szClientName),
user.uVersion,
self.get_user_account(_str(user.szUsername)),
UserType(user.uUserType),
True
if _str(user.szUsername) in self.config.users.admins or user.uUserType == 2
else False,
_str(user.szUsername) in self.config.users.banned_users,
)
def get_user_account(self, username: str) -> UserAccount:
return UserAccount(username, "", "", UserType.Null, UserRight.Null, "/")
def get_user_account_by_tt_obj(self, obj: TeamTalkPy.UserAccount) -> UserAccount:
return UserAccount(
_str(obj.szUsername),
_str(obj.szPassword),
_str(obj.szNote),
UserType(obj.uUserType),
UserRight(obj.uUserRights),
_str(obj.szInitChannel),
)
def get_event(self, obj: TeamTalkPy.TTMessage) -> Event:
try:
channel = self.get_channel_from_obj(obj.channel)
except (UnicodeDecodeError, ValueError):
channel = Channel(1, "", "", 0, ChannelType.Default)
try:
error = self.get_error(obj.clienterrormsg.nErrorNo, obj.nSource)
except (UnicodeDecodeError, ValueError):
error = Error("", ErrorType.Success, 1)
try:
file = self.get_file(obj.remotefile)
except (UnicodeDecodeError, ValueError):
file = File(1, "", channel, 0, "")
try:
user_account = self.get_user_account_by_tt_obj(obj.useraccount)
except (UnicodeDecodeError, ValueError):
user_account = UserAccount("", "", "", UserType.Null, UserRight.Null, "")
try:
user = self.get_user(obj.user.nUserID)
except (UnicodeDecodeError, ValueError):
user = User(
1,
"",
"",
"",
UserStatusMode.M,
UserState.Null,
channel,
"",
1,
user_account,
UserType.Null,
False,
False,
)
try:
message = self.get_message(obj.textmessage)
except (UnicodeDecodeError, ValueError):
message = Message("", user, channel, MessageType.NONE)
return Event(
EventType(obj.nClientEvent),
obj.nSource,
channel,
error,
file,
message,
user,
user_account,
)
def get_input_devices(self) -> List[SoundDevice]:
devices: List[SoundDevice] = []
device_list = [i for i in self.tt.getSoundDevices()]
for device in device_list:
if sys.platform == "win32":
if (
device.nSoundSystem == TeamTalkPy.SoundSystem.SOUNDSYSTEM_WASAPI
and device.nMaxOutputChannels == 0
):
devices.append(
SoundDevice(
_str(device.szDeviceName),
device.nDeviceID,
SoundDeviceType.Input,
)
)
else:
devices.append(
SoundDevice(
_str(device.szDeviceName),
device.nDeviceID,
SoundDeviceType.Input,
)
)
return devices
def set_input_device(self, id: int) -> None:
self.tt.initSoundInputDevice(id)
def enable_voice_transmission(self) -> None:
self.tt.enableVoiceTransmission(True)
self.is_voice_transmission_enabled = True
def disable_voice_transmission(self) -> None:
self.tt.enableVoiceTransmission(False)
self.is_voice_transmission_enabled = False