Source code for lavalink.player

from __future__ import annotations

import asyncio
import datetime
import random
from collections import deque
from random import shuffle
from typing import TYPE_CHECKING, Optional, Any, Union

import discord
from discord.backoff import ExponentialBackoff
from discord.ext.commands import Bot
from discord.voice_client import VoiceProtocol

from . import log, ws_rll_log
from .enums import (
    LavalinkEvents,
    LavalinkIncomingOp,
    LavalinkOutgoingOp,
    PlayerState,
    TrackEndReason,
)
from .rest_api import RESTClient, Track
from .tuples import EqualizerBands, PositionTime

if TYPE_CHECKING:
    from .node import Node

__all__ = ["Player"]


[docs]class Player(RESTClient, VoiceProtocol): """ The Player class represents the current state of playback. It also is used to control playback and queue tracks. The existence of this object guarantees that the bot is connected to a voice channel. Attributes ---------- channel: discord.VoiceChannel The channel the bot is connected to. queue : deque[Track] list of Track position : int The seeked position in the track of the current playback. current : Track The current playing track repeat : bool Repeat the current track loop_queue: bool Repeat the current queue shuffle : bool The newly added tracks to the queue gets shuffled """ channel: discord.VoiceChannel queue: deque[Track] position: int current: Track repeat: bool loop_queue: bool shuffle: bool def __call__(self, client: Bot, channel: discord.VoiceChannel): self.client: Bot = client self.channel: discord.VoiceChannel = channel return self def __init__( self, client: Bot = None, channel: discord.VoiceChannel = None, node: "Node" = None, ssl: bool = False ): """ Initialize the player Parameters ---------- client: discord.ext.commands.Bot The bot instance channel : discord.VoiceChannel The channel to connect to node : Node The node to use ssl : bool Whether to use the `https://` protocol. """ self.client: Bot = client self.channel: discord.VoiceChannel = channel self.guild: discord.Guild = channel.guild self._last_channel_id: int = channel.id self.queue: deque[Track] = deque() self.position = 0 self.current: Optional[Track] = None self._paused = False self.repeat = False self.loop_queue = False self.shuffle = False self.shuffle_bumped = True self._is_autoplaying = False self._auto_play_sent = False self._volume = 100 self.state = PlayerState.CREATED self._voice_state = {} self.connected_at = None self._connected = False self._is_playing = False self._metadata = {} if node is None: from .node import get_node self.node = get_node() else: self.node = node self._con_delay = None self._last_resume = None self.voice_state = {} super().__init__(self, ssl) def __repr__(self): return ( "<Player: " f"state={self.state.name}, connected={self.connected}, " f"guild={self.guild.name!r} ({self.guild.id}), " f"channel={self.channel.name!r} ({self.channel.id}), " f"playing={self.is_playing}, paused={self.paused}, volume={self.volume}, " f"queue_size={len(self.queue)}, current={self.current!r}, " f"position={self.position}, " f"length={self.current.length if self.current else 0}, node={self.node!r}>" ) @property def is_auto_playing(self) -> bool: """ Current status of playback """ return self._is_playing and not self._paused and self._is_autoplaying @property def is_playing(self) -> bool: """ Current status of playback """ return self._is_playing and not self._paused and self._connected @property def paused(self) -> bool: """ Player's paused state. """ return self._paused @property def volume(self) -> int: """ The current volume. """ return self._volume @property def ready(self) -> bool: """ Whether the underlying node is ready for requests. """ return self.node.ready @property def connected(self) -> bool: """ Whether the player is ready to be used. """ return self._connected
[docs] async def on_voice_server_update(self, data: dict[str, Any]): """ Send voice server update data to the node Parameters ---------- data : dict[str, Any] The data to send """ self._voice_state.update({"event": data}) await self._send_lavalink_voice_update(self._voice_state)
[docs] async def on_voice_state_update(self, data: dict): """ Send voice state update data to the node Parameters ---------- data : dict[str, Any] The data to send """ self._voice_state.update({"sessionId": data["session_id"]}) if (channel_id := data["channel_id"]) is None: ws_rll_log.info("Received voice disconnect from discord, removing player.") self._voice_state.clear() await self.disconnect(force=True) else: channel = self.guild.get_channel(int(channel_id)) if channel != self.channel: if self.channel: self._last_channel_id = self.channel.id self.channel = channel await self._send_lavalink_voice_update({**self._voice_state, "event": data})
async def _send_lavalink_voice_update(self, voice_state: dict): if self._voice_state.keys() == {"sessionId", "event"}: await self.node.send( { "op": LavalinkOutgoingOp.VOICE_UPDATE.value, "guildId": str(self.guild.id), "sessionId": voice_state["sessionId"], "event": voice_state["event"], } )
[docs] async def wait_until_ready( self, timeout: Optional[float] = None, no_raise: bool = False ) -> bool: """ Waits for the underlying node to become ready. Parameters ---------- timeout : Optional[float] The time to wait for the node to get ready. A timeout of None means to wait indefinitely. no_raise: bool If no_raise is set, returns false when a timeout occurs instead of propagating TimeoutError. """ if self.node.ready: return True try: return await self.node.wait_until_ready(timeout=timeout) except asyncio.TimeoutError: if no_raise: return False else: raise
[docs] async def connect(self, timeout: float = 2.0, reconnect: bool = False, deafen: bool = False): """ Connects to the voice channel associated with this Player. Parameters ---------- deafen: bool Prevent the bot from listening others' timeout : float Actually is not used, but must be present to match the signature reconnect: bool Actually is not used, but must be present to match the signature """ self._last_resume = datetime.datetime.now(datetime.timezone.utc) self.connected_at = datetime.datetime.now(datetime.timezone.utc) self._connected = True self.node.add_player(self.guild.id, self) await self.node.refresh_player_state(self) await self.guild.change_voice_state( channel=self.channel, self_mute=False, self_deaf=deafen )
[docs] async def move_to(self, channel: discord.VoiceChannel, deafen: bool = False): """ Moves this player to a voice channel. Parameters ---------- deafen : bool Prevent the bot from listening others channel : discord.VoiceChannel The channel to move to Raises ------ TypeError If the channel is in another guild """ if channel.guild != self.guild: raise TypeError(f"Cannot move {self!r} to a different guild.") if self.channel: self._last_channel_id = self.channel.id self.channel = channel await self.connect(deafen=deafen) if self.current: await self.resume( track=self.current, replace=True, start=self.position, pause=self._paused )
[docs] async def disconnect(self, force: bool = False): """ Disconnects this player from its voice channel. Parameters ---------- force : bool Force the player to disconnect """ self._is_autoplaying = False self._auto_play_sent = False self._connected = False if self.state == PlayerState.DISCONNECTING: return await self.update_state(PlayerState.DISCONNECTING) guild_id = self.guild.id if force: log.debug("Forcing player disconnect for %r", self) self.node.event_handler( LavalinkIncomingOp.EVENT, LavalinkEvents.FORCED_DISCONNECT, { "guildId": guild_id, "code": 42069, "reason": "Forced Disconnect - Do not Reconnect", "byRemote": True, "retries": -1, }, ) await self.guild.change_voice_state(channel=None) await self.node.destroy_guild(guild_id) self.node.remove_player(self) self.cleanup()
[docs] def store(self, key: Union[str, int], value: Any): """ Stores a metadata value by key. Parameters ---------- key : Union[str, int] value : Any """ self._metadata[key] = value
[docs] def fetch(self, key: Union[str, int], default: Optional[Any] = None) -> Any: """ Returns a stored metadata value. Parameters ---------- key Used to store metadata. default Optional, used if the key doesn't exist. Returns ------- Any If the key doesn't exist returns the default value, otherwise the set value """ return self._metadata.get(key, default)
[docs] async def update_state(self, state: PlayerState): """ Change the current player state Parameters ---------- state: PlayerState The state to set """ if state == self.state: return ws_rll_log.debug("Player %r changing state: %s -> %s", self, self.state.name, state.name) self.state = state if self._con_delay: self._con_delay = None
[docs] async def handle_event(self, event: "LavalinkEvents", extra: Any): """ Handles various Lavalink Events. If the event is TRACK END, extra will be TrackEndReason. If the event is TRACK EXCEPTION, extra will be the string reason. If the event is TRACK STUCK, extra will be the threshold ms. Parameters ---------- event : node.LavalinkEvents extra : Any """ log.debug("Received player event for player: %r - %r - %r.", self, event, extra) if event == LavalinkEvents.TRACK_END: if extra == TrackEndReason.FINISHED: await self.play() else: self._is_playing = False elif event == LavalinkEvents.WEBSOCKET_CLOSED: code = extra.get("code") if code in (4015, 4014, 4009, 4006, 4000, 1006): if not self._con_delay: self._con_delay = ExponentialBackoff(base=1)
[docs] async def handle_player_update(self, state: "PositionTime"): """ Handles player updates from lavalink. Parameters ---------- state : websocket.PlayerState """ if state.position > self.position: self._is_playing = True log.debug("Updated player position for player: %r - %ds.", self, state.position // 1000) self.position = state.position
# Play commands
[docs] def add(self, requester: discord.User, track: Track): """ Adds a track to the queue. Parameters ---------- requester : discord.User Who requested the track. track : Track Result from any of the lavalink track search methods. """ track.requester = requester self.queue.append(track)
[docs] def maybe_shuffle(self, sticky_songs: int = 1): """Shuffle""" if self.shuffle and self.queue: # Keeps queue order consistent unless adding new tracks self.force_shuffle(sticky_songs)
[docs] def force_shuffle(self, sticky_songs: int = 1): """Shuffle the queue""" if not self.queue: return sticky = max(0, sticky_songs) # Songs to bypass shuffle # Keeps queue order consistent unless adding new tracks if sticky > 0: to_keep = self.queue[:sticky] to_shuffle = self.queue[sticky:] else: to_shuffle = self.queue to_keep = [] if not self.shuffle_bumped: to_keep_bumped = [t for t in to_shuffle if t.extras.get("bumped", None)] to_shuffle = [t for t in to_shuffle if not t.extras.get("bumped", None)] to_keep.extend(to_keep_bumped) # Shuffles whole queue shuffle(to_shuffle) to_keep.extend(to_shuffle) # Keep next track in queue consistent while adding new tracks self.queue = to_keep
[docs] async def play(self): """ Starts playback from lavalink. """ if self.repeat and self.current is not None: self.queue.append(self.current) self.current = None self.position = 0 self._paused = False if not self.queue: await self.stop() else: self._is_playing = True track = self.queue.pop() if self.loop_queue: if self.current is not None: self.queue.append(self.current) else: self.queue.append(track) self.current = track log.debug("Assigned current track for player: %r.", self) await self.node.play(self.guild.id, track, start=track.start_timestamp, replace=True)
async def resume( self, track: Track, replace: bool = True, start: int = 0, pause: bool = False ): log.debug("Resuming current track for player: %r.", self) self._is_playing = False self._paused = True await self.node.play(self.guild.id, track, start=start, replace=replace, pause=True) await self.pause(True) await self.pause(pause, timed=1)
[docs] async def stop(self): """ Stops playback from lavalink. .. important:: This method will clear the queue. """ await self.node.stop(self.guild.id) self.queue = deque() self.current = None self.position = 0 self._paused = False self._is_autoplaying = False self._auto_play_sent = False
[docs] async def skip(self): """ Skips to the next song. """ await self.play()
[docs] async def pause(self, pause: bool = True, timed: Optional[int] = None): """ Pauses the current song. Parameters ---------- pause : bool Set to ``False`` to resume. timed : Optional[int] If an int is given the op will be called after it. """ if timed is not None: await asyncio.sleep(timed) self._paused = pause await self.node.pause(self.guild.id, pause)
[docs] async def set_volume(self, volume: int): """ Sets the volume of Lavalink. Parameters ---------- volume : int Between 0 and 150 """ self._volume = max(min(volume, 150), 0) await self.node.volume(self.guild.id, self.volume)
[docs] async def seek(self, position: int): """ If the track allows it, seeks to a position. Parameters ---------- position : int Between 0 and track length. """ if self.current.seekable: position = max(min(position, self.current.length), 0) await self.node.seek(self.guild.id, position)
[docs] async def bass_boost(self): """Bass boost the song""" await self.node.equalizer(self.guild.id, [EqualizerBands(0, 0.15), EqualizerBands(1, 0.15), EqualizerBands(2, 0.15)])
[docs] async def nightcore(self): """Apply the effect nightcore on the song""" await self.node.time_scale(self.guild.id, speed=1.20, pitch=1.1, rate=1.20)
[docs] async def random_distortion(self): """ Apply a random distortion .. warning:: It can be extremely painful to listen to """ await self.node.distortion( self.guild.id, random.randrange(0, 10), random.randrange(0, 10), random.randrange(0, 10), random.randrange(0, 10), random.randrange(0, 10), random.randrange(0, 10), random.randrange(0, 10), random.randrange(0, 10) )
[docs] async def reset_filter(self): """Remove any applied filter""" await self.node.reset_filter(self.guild.id)
[docs] async def equalizer(self, band: int, gain: float = 0.25): """ Change the equalizer Parameters ---------- band: int 0 ≤ x ≤ 14 gain: float is the multiplier for the given band -0.25 ≤ x ≤ 1 """ await self.node.equalizer(self.guild.id, [EqualizerBands(band, gain)])
[docs] async def karaoke(self, level: float = 1.0, mono_level: float = 1.0, filter_band: float = 220.0, filter_width: float = 100.0): """ Remove the vocals Parameters ---------- level : float how much to filter mono_level : float how much to filter filter_band : float the frequency band to filter filter_width : float the frequency width to filter """ await self.node.karaoke(self.guild.id, level, mono_level, filter_band, filter_width)
[docs] async def rotation(self, rotation: Optional[int] = None): """ Rotates the sound around the stereo channels/user headphones Audio Panning Parameters ---------- rotation : Optional[int] The frequency of the audio rotating around the listener in Hz """ if rotation is None: rotation = random.randint(0, 60) await self.node.rotation(self.guild.id, rotation=rotation)
[docs] async def timescale(self, speed: float = 1.0, pitch: float = 1.0, rate: float = 1.0): """ Changes the speed, pitch, and rate Parameters ---------- speed : float Should be >= 0 pitch : float Should be >= 0 rate : float Should be >= 0 """ await self.node.time_scale(self.guild.id, speed, pitch, rate)
[docs] async def vibrato(self, frequency: float = 2.0, depth: float = 0.5): """ Oscillates the pitch. Parameters ---------- frequency : float 0 < x ≤ 14 depth : float 0 < x ≤ 1 """ await self.node.vibrato(self.guild.id, frequency, depth)
[docs] async def tremolo(self, frequency: float = 2.0, depth: float = 0.5): """ Uses amplification to create a shuddering effect. Oscillates the volume Parameters ---------- frequency : float Should be >= 0 depth : float 0 < x ≤ 1 """ await self.node.tremolo(self.guild.id, frequency, depth)
[docs] async def distortion(self, sin_offset: float = 0, sin_scale: float = 1, cos_offset: float = 0, cos_scale: float = 1, tan_offset: float = 0, tan_scale: float = 1, offset: float = 0, scale: float = 1): """ Distortion effect Parameters ---------- sin_offset : int sin_scale : int cos_offset : float cos_scale : float tan_offset : float tan_scale : float offset : float scale : float """ await self.node.distortion(self.guild.id, sin_offset, sin_scale, cos_offset, cos_scale, tan_offset, tan_scale, offset, scale)
[docs] async def channel_mix(self, left_to_left: float = 1.0, left_to_right: float = 0.0, right_to_left: float = 0.0, right_to_right: float = 1.0): """ Mixes both channels (left and right) Parameters ---------- left_to_left : float left_to_right : float right_to_left : float right_to_right : float """ await self.node.channel_mix(self.guild.id, left_to_left, left_to_right, right_to_left, right_to_right)
[docs] async def low_pass(self, smoothing: float = 20.0): """ Higher frequencies get suppressed, while lower frequencies pass through this filter Parameters ---------- smoothing : float how much to suppress """ await self.node.low_pass(self.guild.id, smoothing)