from __future__ import annotations
import asyncio
import secrets
import string
import typing
from collections import deque
from typing import KeysView, Optional, ValuesView, Union, Any
import aiohttp
from discord.backoff import ExponentialBackoff
from discord.ext.commands import Bot
from . import log, ws_ll_log, ws_rll_log
from .enums import LavalinkEvents, LavalinkIncomingOp, LavalinkOutgoingOp, NodeState, PlayerState, FiltersOp
from .player import Player
from .rest_api import Track
from .tuples import *
from .utils import VoiceChannel
__all__ = ["Node", "NodeStats", "get_node", "get_nodes_stats", "Stats"]
_nodes: list[Node] = []
# Originally Added in: https://github.com/PythonistaGuild/Wavelink/pull/66
class _Key:
def __init__(self, key_len: int = 32):
self.length: int = key_len
self.persistent: str = ""
self.__repr__()
def __repr__(self) -> str:
"""Generate a new key, return it and make it persistent"""
alphabet = string.ascii_letters + string.digits + "#$%&()*+,-./:;<=>?@[]^_~!"
key = "".join(secrets.choice(alphabet) for _ in range(self.length))
self.persistent = key
return key
def __str__(self) -> str:
"""Return the persistent key."""
# Ensure output is not a non-string
# Since input could be Any object.
if not self.persistent:
return self.__repr__()
return str(self.persistent)
[docs]class Stats:
"""
The NodeStats class contains the performance stats of a certain node
Attributes
----------
uptime : int
players : int
active_players : int
memory : MemoryInfo
cpu_info : CPUInfo
"""
uptime: int
players: int
active_players: int
memory: MemoryInfo
cpu_info: CPUInfo
def __init__(self,
memory: dict[str, int],
players: int,
active_players: int,
cpu: dict[str, Union[str, float]],
uptime: int
):
self.memory = MemoryInfo(**memory)
self.players = players
self.active_players = active_players
self.cpu_info = CPUInfo(**cpu)
self.uptime = uptime
def __repr__(self) -> str:
return (
"<Stats: "
f"uptime={self.uptime}, "
f"players={self.players}, "
f"active_players={self.active_players}, "
f"memory={self.memory}, "
f"cpu_info={self.cpu_info}"
)
# Node stats related class below and how it is called is originally from:
# https://github.com/PythonistaGuild/Wavelink/blob/master/wavelink/stats.py#L41
# https://github.com/PythonistaGuild/Wavelink/blob/master/wavelink/websocket.py#L132
[docs]class NodeStats:
"""
The NodeStats class contains the performance stats of a certain node
Attributes
----------
uptime : int
players : int
playing_players : int
memory_free : int
memory_used : int
memory_allocated : int
memory_reservable : int
cpu_cores : int
system_load : float
lavalink_load : float
frames_sent : int
frames_nulled : int
frames_deficit : int
"""
uptime: int
players: int
playing_players: int
memory_free: int
memory_used: int
memory_allocated: int
memory_reservable: int
cpu_cores: int
system_load: float
lavalink_load: float
frames_sent: int
frames_nulled: int
frames_deficit: int
def __init__(self, data: dict[str, Any]):
"""
Initialize the object
Parameters
----------
data: dict
"""
self.uptime: int = data["uptime"]
self.players: int = data["players"]
self.playing_players: int = data["playingPlayers"]
memory: dict[str, int] = data["memory"]
self.memory_free: int = memory["free"]
self.memory_used: int = memory["used"]
self.memory_allocated: int = memory["allocated"]
self.memory_reservable: int = memory["reservable"]
cpu: dict[str, Union[int, float]] = data["cpu"]
self.cpu_cores: int = cpu["cores"]
self.system_load: float = cpu["systemLoad"]
self.lavalink_load: float = cpu["lavalinkLoad"]
frame_stats: Optional[dict[str, int]] = data.get("frameStats", {})
self.frames_sent: int = frame_stats.get("sent", -1)
self.frames_nulled: int = frame_stats.get("nulled", -1)
self.frames_deficit: int = frame_stats.get("deficit", -1)
def __repr__(self) -> str:
return (
"<NodeStats: "
f"uptime={self.uptime}, "
f"players={self.players}, "
f"playing_players={self.playing_players}, "
f"memory_free={self.memory_free}, memory_used={self.memory_used}, "
f"cpu_cores={self.cpu_cores}, system_load={self.system_load}, "
f"lavalink_load={self.lavalink_load}>"
)
[docs]class Node:
_is_shutdown: bool = False
def __init__(
self,
_loop: asyncio.BaseEventLoop,
event_handler: typing.Callable,
host: str,
password: str,
port: int,
user_id: int,
num_shards: int,
resume_key: Optional[str] = None,
resume_timeout: int = 60,
bot: Optional[Bot] = None,
):
"""
Represents a Lavalink node.
Parameters
----------
_loop : asyncio.BaseEventLoop
The event loop of the bot.
event_handler
Function to dispatch events to.
host : str
Lavalink player host.
password : str
Password for the Lavalink player.
port : int
Port of the Lavalink player event websocket.
user_id : int
User ID of the bot.
num_shards : int
Number of shards to which the bot is currently connected.
resume_key : Optional[str]
A resume key used for resuming a session upon re-establishing a WebSocket connection to Lavalink.
resume_timeout : int
How long the node should wait for a connection while disconnected before clearing all players.
bot: discord.ext.commands.Bot
The Bot object that's connect to discord.
"""
self.loop = _loop
self.bot = bot
self.event_handler = event_handler
self.host = host
self.port = port
self.password = password
self._resume_key = resume_key
if self._resume_key is None:
self._resume_key = self._gen_key()
self._resume_timeout = resume_timeout
self._resuming_configured = False
self.num_shards = num_shards
self.user_id = user_id
self._ready_event = asyncio.Event()
self._ws = None
self._listener_task = None
self.session = aiohttp.ClientSession()
self._queue = deque()
self._players_dict = {}
self.state = NodeState.CONNECTING
self._state_handlers = deque()
self._retries = 0
self.stats = None
if self not in _nodes:
_nodes.append(self)
self._closers = (
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSED,
)
self.register_state_handler(self.node_state_handler)
def __repr__(self) -> str:
return (
"<Node: "
f"state={self.state.name}, "
f"host={self.host}, "
f"port={self.port}, "
f"password={'*' * len(self.password)}, resume_key={self._resume_key}, "
f"shards={self.num_shards}, user={self.user_id}, stats={self.stats}>"
)
@property
def headers(self) -> dict:
return self._get_connect_headers()
@property
def players(self) -> ValuesView[Player]:
return self._players_dict.values()
@property
def guild_ids(self) -> KeysView[int]:
return self._players_dict.keys()
def _gen_key(self):
if self._resume_key is None:
return _Key()
else:
# if this is a class then it will generate a persistent key
# We should not check the instance since
# we would still make 1 extra call to check, which is useless.
self._resume_key.__repr__()
return self._resume_key
[docs] async def connect(self, timeout: int = None, ssl: bool = False):
"""
Connects to the Lavalink player event websocket.
Parameters
----------
ssl: bool
Whether to use the `wss://` protocol.
timeout : int
Time after which to timeout on attempting to connect to the Lavalink websocket,
``None`` is considered never, but the underlying code may stop trying past a
certain point.
Raises
------
asyncio.TimeoutError
If the websocket failed to connect after the given time.
"""
self._is_shutdown = False
if ssl:
uri = f"wss://{self.host}:{self.port}"
else:
uri = f"ws://{self.host}:{self.port}"
ws_ll_log.info("Lavalink WS connecting to %s with headers %s", uri, self.headers)
await asyncio.wait_for(self._multi_try_connect(uri), timeout)
ws_ll_log.debug("Creating Lavalink WS listener.")
if self._listener_task is not None:
self._listener_task.cancel()
self._listener_task = self.loop.create_task(self.listener())
self.loop.create_task(self._configure_resume())
if self._queue:
for data in self._queue:
await self.send(data)
self._queue.clear()
self._ready_event.set()
self.update_state(NodeState.READY)
ws_ll_log.info("Lavalink WS connected to %s", uri)
async def _configure_resume(self):
if self._resuming_configured:
return
if self._resume_key and self._resume_timeout and self._resume_timeout > 0:
await self.send(
dict(
op="configureResuming",
key=str(self._resume_key),
timeout=self._resume_timeout,
)
)
self._resuming_configured = True
ws_ll_log.debug("Server Resuming has been configured.")
async def wait_until_ready(self, timeout: Optional[float] = None):
await asyncio.wait_for(self._ready_event.wait(), timeout=timeout)
def _get_connect_headers(self) -> dict:
headers = {
"Authorization": self.password,
"User-Id": str(self.user_id),
"Num-Shards": str(self.num_shards),
"Client-Name": "Useless-Lavalink"
}
if self._resume_key:
headers["Resume-Key"] = str(self._resume_key)
return headers
@property
def lavalink_major_version(self) -> str:
"""
Get the major version of the lavalink node
"""
if not self.ready:
raise RuntimeError("Node not ready!")
return self._ws.response_headers.get("Lavalink-Major-Version")
@property
def ready(self) -> bool:
"""
Whether the underlying node is ready for requests.
"""
return self.state == NodeState.READY
async def _multi_try_connect(self, uri):
backoff = ExponentialBackoff()
attempt = 1
if self._ws is not None:
await self._ws.close(code=4006, message=b"Reconnecting")
while self._is_shutdown is False and (self._ws is None or self._ws.closed):
self._retries += 1
try:
ws = await self.session.ws_connect(url=uri, headers=self.headers, heartbeat=60)
except (OSError, aiohttp.ClientConnectionError):
delay = backoff.delay()
ws_ll_log.error("Failed connect attempt %s, retrying in %s", attempt, delay)
await asyncio.sleep(delay)
attempt += 1
if attempt > 5:
raise asyncio.TimeoutError
except aiohttp.WSServerHandshakeError:
ws_ll_log.error("Failed connect WSServerHandshakeError")
raise asyncio.TimeoutError
else:
self.session_resumed = ws._response.headers.get("Session-Resumed", False)
if self._ws is not None and self.session_resumed:
ws_ll_log.info("WEBSOCKET Resumed Session with key: %s", self._resume_key)
self._ws = ws
break
[docs] async def listener(self):
"""
Listener task for receiving ops from Lavalink.
"""
while self._is_shutdown is False:
msg = await self._ws.receive()
if msg.type in self._closers:
if self._resuming_configured:
if self.state != NodeState.RECONNECTING:
ws_ll_log.info("[NODE] | NODE Resuming: %s", msg.extra)
self.update_state(NodeState.RECONNECTING)
self.loop.create_task(self._reconnect())
return
else:
ws_ll_log.info("[NODE] | Listener closing: %s", msg.extra)
break
elif msg.type == aiohttp.WSMsgType.TEXT:
data = msg.json()
try:
op = LavalinkIncomingOp(data.get("op"))
except ValueError:
ws_ll_log.info("[NODE] | Received unknown op: %s", data)
else:
ws_ll_log.debug("[NODE] | Received known op: %s", data)
self.loop.create_task(self._handle_op(op, data))
elif msg.type == aiohttp.WSMsgType.ERROR:
exc = self._ws.exception()
ws_ll_log.info("[NODE] | Exception in WebSocket!", exc_info=exc)
break
else:
ws_ll_log.info(
"[NODE] | WebSocket connection received unexpected message: %s:%s",
msg.type,
msg.data,
)
if self.state != NodeState.RECONNECTING:
ws_ll_log.warning(
"[NODE] | %s - WS %s SHUTDOWN %s.", self, not self._ws.closed, self._is_shutdown
)
self.update_state(NodeState.RECONNECTING)
self.loop.create_task(self._reconnect())
async def _handle_op(self, op: LavalinkIncomingOp, data: dict[str, Any]):
if op == LavalinkIncomingOp.EVENT:
try:
event = LavalinkEvents(data.get("type"))
except ValueError:
ws_ll_log.info("Unknown event type: %s", data)
else:
self.event_handler(op, event, data)
elif op == LavalinkIncomingOp.PLAYER_UPDATE:
state = data.get("state", {})
state = PositionTime(position=state.get("position", 0), time=state.get("time", 0),
connected=state.get("connected", False))
self.event_handler(op, state, data)
elif op == LavalinkIncomingOp.STATS:
stats = Stats(
memory=data.get("memory"),
players=data.get("players"),
active_players=data.get("playingPlayers"),
cpu=data.get("cpu"),
uptime=data.get("uptime"),
)
self.stats = NodeStats(data)
self.event_handler(op, stats, data)
else:
ws_ll_log.info("Unknown op type: %r", data)
async def _reconnect(self):
self._ready_event.clear()
if self._is_shutdown is True:
ws_ll_log.info("[NODE] | Shutting down Lavalink WS.")
return
if self.state != NodeState.CONNECTING:
self.update_state(NodeState.RECONNECTING)
if self.state != NodeState.RECONNECTING:
return
backoff = ExponentialBackoff(base=1)
attempt = 1
while self.state == NodeState.RECONNECTING:
attempt += 1
try:
await self.connect()
except asyncio.TimeoutError:
delay = backoff.delay()
ws_ll_log.info("[NODE] | Failed to reconnect to the Lavalink server.")
ws_ll_log.info(
"[NODE] | Lavalink WS reconnect connect attempt %s, retrying in %s",
attempt,
delay,
)
else:
ws_ll_log.info("[NODE] | Reconnect successful.")
self.dispatch_reconnect()
self._retries = 0
def dispatch_reconnect(self):
for guild_id in self.guild_ids:
self.event_handler(
LavalinkIncomingOp.EVENT,
LavalinkEvents.WEBSOCKET_CLOSED,
{
"guildId": guild_id,
"code": 42069,
"reason": "Lavalink WS reconnected",
"byRemote": True,
"retries": self._retries,
},
)
def update_state(self, next_state: NodeState):
if next_state == self.state:
return
ws_ll_log.debug("Changing node state: %s -> %s", self.state.name, next_state.name)
old_state = self.state
self.state = next_state
if self.loop.is_closed():
ws_ll_log.debug("Event loop closed, not notifying state handlers.")
return
for handler in self._state_handlers:
self.loop.create_task(handler(next_state, old_state))
def register_state_handler(self, func):
if not asyncio.iscoroutinefunction(func):
raise ValueError("Argument must be a coroutine object.")
if func not in self._state_handlers:
self._state_handlers.append(func)
def unregister_state_handler(self, func):
self._state_handlers.remove(func)
[docs] async def create_player(self, channel: VoiceChannel, deafen: bool = False) -> Player:
"""
Connects to a discord voice channel.
This function is safe to repeatedly call as it will return an existing
player if there is one.
Parameters
----------
deafen
channel
Returns
-------
Player
The created Player object.
"""
if self._already_in_guild(channel):
p = self.get_player(channel.guild.id)
await p.move_to(channel, deafen=deafen)
else:
p = await channel.connect(cls=Player)
if deafen:
await p.guild.change_voice_state(channel=p.channel, self_deaf=True)
self._players_dict[channel.guild.id] = p
await self.refresh_player_state(p)
return p
def _already_in_guild(self, channel: VoiceChannel) -> bool:
return channel.guild.id in self._players_dict
[docs] def get_player(self, guild_id: int) -> Player:
"""
Gets a Player object from a guild ID.
Parameters
----------
guild_id : int
Discord guild ID.
Returns
-------
Player
Raises
------
KeyError
If that guild does not have a Player, e.g. is not connected to any
voice channel.
"""
if guild_id in self._players_dict:
return self._players_dict[guild_id]
raise KeyError("No such player for that guild.")
async def node_state_handler(self, next_state: NodeState, old_state: NodeState):
ws_rll_log.debug("Received node state update: %s -> %s", old_state.name, next_state.name)
if next_state == NodeState.READY:
await self.update_player_states(PlayerState.READY)
elif next_state == NodeState.DISCONNECTING:
await self.update_player_states(PlayerState.DISCONNECTING)
elif next_state in (NodeState.CONNECTING, NodeState.RECONNECTING):
await self.update_player_states(PlayerState.NODE_BUSY)
async def update_player_states(self, state: PlayerState):
for p in self.players:
await p.update_state(state)
async def refresh_player_state(self, player: Player):
if self.ready:
await player.update_state(PlayerState.READY)
elif self.state == NodeState.DISCONNECTING:
await player.update_state(PlayerState.DISCONNECTING)
else:
await player.update_state(PlayerState.NODE_BUSY)
[docs] def add_player(self, guild_id: int, player: Player):
"""Register a player"""
self._players_dict[guild_id] = player
[docs] def remove_player(self, player: Player):
"""Remove a player"""
if player.state != PlayerState.DISCONNECTING:
log.error(
"Attempting to remove a player (%r) from player list with state: %s",
player,
player.state.name,
)
return
guild_id = player.channel.guild.id
if guild_id in self._players_dict:
del self._players_dict[guild_id]
[docs] async def disconnect(self):
"""
Shuts down and disconnects the websocket.
"""
self._is_shutdown = True
self._ready_event.clear()
self.update_state(NodeState.DISCONNECTING)
if self._resuming_configured:
await self.send(dict(op="configureResuming", key=None))
self._resuming_configured = False
for p in tuple(self.players):
await p.disconnect(force=True)
log.debug("Disconnected all players.")
if self._ws is not None and not self._ws.closed:
await self._ws.close()
if self._listener_task is not None and not self.loop.is_closed():
self._listener_task.cancel()
await self.session.close()
self._state_handlers = []
_nodes.remove(self)
ws_ll_log.info("Shutdown Lavalink WS.")
async def send(self, data):
if self._ws is None or self._ws.closed:
self._queue.append(data)
else:
ws_ll_log.debug("Sending data to Lavalink: %s", data)
await self._ws.send_json(data)
async def send_lavalink_voice_update(self, guild_id, session_id, event):
await self.send(
{
"op": LavalinkOutgoingOp.VOICE_UPDATE.value,
"guildId": str(guild_id),
"sessionId": session_id,
"event": event,
}
)
[docs] async def destroy_guild(self, guild_id: int):
"""
Disconnect from a guild and delete the player
Parameters
----------
guild_id : int
"""
await self.send({"op": LavalinkOutgoingOp.DESTROY.value, "guildId": str(guild_id)})
async def no_event_stop(self, guild_id: int):
await self.send({"op": LavalinkOutgoingOp.STOP.value, "guildId": str(guild_id)})
# Player commands
[docs] async def stop(self, guild_id: int):
"""
Stop the player on the given guild id
Parameters
----------
guild_id : int
"""
await self.no_event_stop(guild_id=guild_id)
self.event_handler(
LavalinkIncomingOp.EVENT, LavalinkEvents.QUEUE_END, {"guildId": str(guild_id)}
)
async def no_stop_play(
self,
guild_id: int,
track: Track,
replace: bool = True,
start: int = 0,
pause: bool = False,
):
await self.send(
{
"op": LavalinkOutgoingOp.PLAY.value,
"guildId": str(guild_id),
"track": track.track_identifier,
"noReplace": not replace,
"startTime": str(start),
"pause": pause,
}
)
[docs] async def play(
self,
guild_id: int,
track: Track,
replace: bool = True,
start: int = 0,
pause: bool = False,
):
"""
Start playing on the given guild id
Parameters
----------
guild_id : int
track : Track
The track to play
replace : bool
If set to true, stop the current song and play the given one
start : int
The number of milliseconds to offset the track by
pause : bool
Pause the current track
"""
# await self.send({"op": LavalinkOutgoingOp.STOP.value, "guildId": str(guild_id)})
await self.no_stop_play(
guild_id=guild_id, track=track, replace=replace, start=start, pause=pause
)
[docs] async def pause(self, guild_id: int, paused: bool):
"""
Pause the current track
Parameters
----------
guild_id : int
paused : bool
If set to True pause the track, otherwise unpause it
"""
await self.send(
{"op": LavalinkOutgoingOp.PAUSE.value, "guildId": str(guild_id), "pause": paused}
)
[docs] async def volume(self, guild_id: int, _volume: int):
"""
Set the volume of the track
Parameters
----------
guild_id : int
_volume : int
Volume may range from 0 to 1000
"""
await self.send(
{"op": LavalinkOutgoingOp.VOLUME.value, "guildId": str(guild_id), "volume": _volume}
)
[docs] async def seek(self, guild_id: int, position: int):
"""
Seek the current track
Parameters
----------
guild_id : int
position : int
The position is in milliseconds.
"""
await self.send(
{"op": LavalinkOutgoingOp.SEEK.value, "guildId": str(guild_id), "position": position}
)
[docs] async def equalizer(self, guild_id: int, bands: list[EqualizerBands]):
"""
Set the equalizer
Parameters
----------
guild_id : int
bands : list[EqualizerBands]
A list of bands to change
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.EQUALIZER.value: [{"band": band.band, "gain": band.gain} for band in bands]
}
)
[docs] async def karaoke(self, guild_id: int, level: float = 1.0, mono_level: float = 1.0, filter_band: float = 220.0,
filter_width: float = 100.0):
"""
Uses equalization to eliminate part of a band, usually targeting vocals.
Parameters
----------
guild_id : int
level : float
how much to filter
mono_level : float
how much to filter
filter_band : float
the frequency band to filter
filter_width : float
the frequency width to filter
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.KARAOKE.value: {
"level": level,
"monoLevel": mono_level,
"filterBand": filter_band,
"filterWidth": filter_width
}
}
)
[docs] async def time_scale(self, guild_id: int, speed: float = 1.0, pitch: float = 1.0, rate: float = 1.0):
"""
Changes the speed, pitch, and rate. All default to 1.
Parameters
----------
guild_id : int
speed : float
Should be >= 0
pitch : float
Should be >= 0
rate : float
Should be >= 0
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.TIMESCALE.value: {
"speed": speed,
"pitch": pitch,
"rate": rate,
}
}
)
[docs] async def tremolo(self, guild_id: int, frequency: float = 2.0, depth: float = 0.5):
"""
Uses amplification to create a shuddering effect, where the volume quickly oscillates.
Parameters
----------
guild_id : int
frequency : float
Should be >= 0
depth : float
0 < x ≤ 1
"""
if not (0 < depth <= 1):
raise ValueError("Depth must be 0 < x ≤ 1")
if frequency <= 0:
raise ValueError("Frequency must be greater than 0")
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.TREMOLO.value: {
"frequency": frequency,
"depth": depth,
}
}
)
[docs] async def vibrato(self, guild_id: int, frequency: float = 2.0, depth: float = 0.5):
"""
Similar to tremolo. While tremolo oscillates the volume, vibrato oscillates the pitch.
Parameters
----------
guild_id : int
frequency : float
0 < x ≤ 14
depth : float
0 < x ≤ 1
"""
if not (0 < depth <= 1):
raise ValueError("Depth must be 0 < x ≤ 1")
if not (0 < frequency <= 14):
raise ValueError("Frequency must be 0 < x ≤ 14")
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.VIBRATO.value: {
"frequency": frequency,
"depth": depth,
}
}
)
[docs] async def rotation(self, guild_id: int, rotation: int = 0):
"""
Rotates the sound around the stereo channels/user headphones aka Audio Panning.
It can produce an effect similar to: https://youtu.be/QB9EB8mTKcc (without the reverb)
Parameters
----------
guild_id : int
rotation : Optional[int]
The frequency of the audio rotating around the listener in Hz
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.ROTATION.value: {
"rotation": rotation,
}
}
)
[docs] async def distortion(self, guild_id: int, sin_offset: int = 0, sin_scale: int = 1, cos_offset: int = 0,
cos_scale: int = 1, tan_offset: int = 0, tan_scale: int = 1, offset: int = 0, scale: int = 1):
"""
Distortion effect
Parameters
----------
guild_id : int
sin_offset : int
sin_scale : int
cos_offset : float
cos_scale : float
tan_offset : float
tan_scale : float
offset : float
scale : float
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.DISTORTION.value: {
"sinOffset": sin_offset,
"sinScale": sin_scale,
"cosOffset": cos_offset,
"cosScale": cos_scale,
"tanOffset": tan_offset,
"tanScale": tan_scale,
"offset": offset,
"scale": scale
}
}
)
[docs] async def channel_mix(self, guild_id: int, left_to_left: float = 1.0, left_to_right: float = 0.0,
right_to_left: float = 0.0, right_to_right: float = 1.0):
"""
Mixes both channels (left and right), with a configurable factor on how much each channel affects the other.
With the defaults, both channels are kept independent from each other.
Setting all factors to 0.5 means both channels get the same audio.
Parameters
----------
guild_id : int
left_to_left : float
left_to_right : float
right_to_left : float
right_to_right : float
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.CHANNEL_MIX.value: {
"leftToLeft": left_to_left,
"leftToRight": left_to_right,
"rightToLeft": right_to_left,
"rightToRight": right_to_right,
}
}
)
[docs] async def low_pass(self, guild_id: int, smoothing: float = 0.0):
"""
Higher frequencies get suppressed, while lower frequencies pass through this filter, thus the name low pass.
Parameters
----------
guild_id : int
smoothing : float
how much to suppress
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id),
FiltersOp.LOW_PASS.value: {
"smoothing": smoothing,
}
}
)
[docs] async def reset_filter(self, guild_id: int):
"""
Remove any applied filter
Parameters
----------
guild_id : int
"""
await self.send(
{
"op": LavalinkOutgoingOp.FILTERS.value,
"guildId": str(guild_id)
}
)
def get_node(guild_id: int = None, ignore_ready_status: bool = False) -> Node:
"""
Gets a node based on a guild ID, useful for noding separation. If the
guild ID does not already have a node association, the least used
node is returned. Skips over nodes that are not yet ready.
Parameters
----------
guild_id : int
ignore_ready_status : bool
Returns
-------
Node
"""
guild_count = 1e10
least_used = None
for node in _nodes:
guild_ids = node.guild_ids
if ignore_ready_status is False and not node.ready:
continue
elif len(guild_ids) < guild_count:
guild_count = len(guild_ids)
least_used = node
if guild_id in guild_ids:
return node
if least_used is None:
raise IndexError("No nodes found.")
return least_used
def get_nodes_stats() -> list[NodeStats]:
"""
Get the stats of all Nodes
Returns
-------
list[NodeStats]
"""
return [node.stats for node in _nodes]
async def disconnect():
"""Disconnect all nodes"""
for node in _nodes.copy():
await node.disconnect()