using Godot; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net.WebSockets; using System.Text; using System.Threading; using System.Threading.Tasks; public partial class TwitchChatWatcher : Node { private readonly ClientWebSocket Socket = new(); public readonly ConcurrentQueue Queue = new(); private CancellationTokenSource TokenSource = new(); public CancellationToken Token => TokenSource.Token; private CommandHandler CommandHandler { get; set; } public WebSocketState State => Socket.State; [Export] public bool PrintAllIncoming { get; set; } private Settings Settings; [Signal] public delegate void IncomingCommandEventHandler(Command command); [Signal] public delegate void SocketConnectedEventHandler(); [Signal] public delegate void SocketDisconnectedEventHandler(); public string Channel { get; private set ;} // Called when the node enters the scene tree for the first time. public override void _Ready() { Settings = GetNode("/root/Settings") ?? throw new Exception($"{nameof(Settings)} node not found"); CommandHandler = GetNode("/root/CommandHandler") ?? throw new Exception($"{nameof(Command)} not found"); } // Called every frame. 'delta' is the elapsed time since the previous frame. public override void _Process(double delta) { } public async Task ConnectAsync() { GD.Print("Connecting"); if (Socket.State == WebSocketState.Open) return; await Socket.ConnectAsync(new Uri("wss://irc-ws.chat.twitch.tv:443"), Token); if (Socket.State == WebSocketState.Open) { _ = Task.Run(GetPacketsTask, Token); _ = Task.Run(HandleMessages, Token); CallDeferred("emit_signal", nameof(SocketConnected)); } else { throw new Exception("Failed to connect to Twitch"); } } public async Task Authenticate(string user = null, string pass = null) { GD.Print("Authenticating"); user ??= $"justinfan{Random.Shared.NextInt64(10000):D4}"; pass ??= "pass"; await SendMessageAsync(TwitchChatMessageType.PASS, parameters: new string[] { pass }); await SendMessageAsync(TwitchChatMessageType.NICK, parameters: new string[] { user }); } public async Task RequestTags() { await SendMessageAsync("CAP REQ :twitch.tv/tags"); } public async Task JoinChannel(string channel) { channel = channel.TrimStart('#'); if (Channel is not null) { await SendMessageAsync(TwitchChatMessageType.PART, parameters: new string[] {"#" + Channel}); } await SendMessageAsync(TwitchChatMessageType.JOIN, parameters: new string[] {"#" + channel}); Channel = channel; } public async Task SendMessageAsync(string message) { await Socket.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, Token); } public async Task SendMessageAsync(TwitchChatMessageType command, IEnumerable parameters = null, IDictionary tags = null, string prefix = null) { static string EscapeTagValue(string s) { if (s is null) return ""; return string.Join("", s.Select(c => c switch { ';' => @"\:", ' ' => @"\s", '\\' => @"\\", '\r' => @"\r", '\n' => @"\n", char ch => ch.ToString(), })); } var message = ""; if (tags is not null && tags.Count != 0) { message = "@" + string.Join(';', tags.OrderBy(p => p.Key).Select(p => $"{p.Key}={EscapeTagValue(p.Value)}")) + " "; } if (prefix is not null && !string.IsNullOrWhiteSpace(prefix)) message += ":" + prefix + " "; message += command.ToCommand() + " "; if (parameters is not null && parameters.Any()) { message += string.Join(' ', parameters.SkipLast(1)); message += " :" + parameters.Last(); } await SendMessageAsync(message); } private async Task GetPacketsTask() { try { var arr = new byte[16 * 1024]; var stringData = ""; while (!Token.IsCancellationRequested) { var res = await Socket.ReceiveAsync(arr, Token); if (Token.IsCancellationRequested) return; if (Socket.State != WebSocketState.Open) { GD.PrintErr("Socket closed"); CallDeferred("emit_signal", nameof(SocketDisconnected)); return; } if (res.Count == 0) { GD.PrintErr("Empty packet received"); continue; } stringData += Encoding.UTF8.GetString(arr, 0, res.Count); //PrintQueue.Enqueue(stringData); var lines = stringData.Split("\r\n", StringSplitOptions.TrimEntries); if (!lines.Any()) continue; stringData = lines.Last(); foreach (var line in lines.SkipLast(1)) MessageStrings.Enqueue(line); } } finally { if (!Token.IsCancellationRequested) GD.PushError($"{nameof(GetPacketsTask)} exited without cancellation"); } } private readonly ConcurrentQueue MessageStrings = new(); private async Task HandleMessages() { try { while (!Token.IsCancellationRequested) { while (MessageStrings.TryDequeue(out string message)) { if (string.IsNullOrWhiteSpace(message)) continue; GD.Print(message); // if (PrintAllIncoming) // PrintQueue.Enqueue(message); var tcm = TwitchChatMessage.Parse(message); if (tcm.MessageType == TwitchChatMessageType.PING) _ = Task.Run(() => SendPong(tcm), Token); else if (tcm is Privmsg p) { var trig = Settings.Trigger; if (string.IsNullOrWhiteSpace(trig)) break; if (!p.ChatMessage.StartsWith(trig)) continue; var chat = p.ChatMessage; chat = chat[trig.Length..].TrimStart(); //TODO make better CallDeferred("emit_signal", SignalName.IncomingCommand, new Command(p.DisplayName, false, p.Moderator, chat)); } } await Task.Delay(50); } } finally { if (!Token.IsCancellationRequested) GD.PushError($"{nameof(HandleMessages)} exited without cancellation"); } } private async Task SendPong(TwitchChatMessage ping) { var pong = TwitchChatMessage.MakePong(ping); await SendMessageAsync(TwitchChatMessageType.PONG, ping.Parameters, ping.MessageTags, ping.Prefix); GD.Print("Sent Pong"); } }