basically function but needs a lot of refinement

This commit is contained in:
2024-04-23 03:24:23 -05:00
parent 3ac7bf33c4
commit d3beca8014
26 changed files with 985 additions and 345 deletions

View File

@@ -15,22 +15,35 @@ public partial class TwitchChatWatcher : Node
private readonly CancellationTokenSource TokenSource = new();
public CancellationToken Token => TokenSource.Token;
private CommandHandler CommandHandler { get; set; }
public WebSocketState State => Socket.State;
[Export]
public bool PrintAllIncoming { get; set; }
private Settings Settings;
[Signal]
public delegate void IncomingCommandEventHandler(Command command);
// Called when the node enters the scene tree for the first time.
public override void _Ready()
{
CommandHandler = GetNode<CommandHandler>("/root/CommandHandler");
Settings = GetNode<Settings>("/root/Settings")
?? throw new Exception($"{nameof(Settings)} node not found");
CommandHandler = GetNode<CommandHandler>("/root/CommandHandler")
?? throw new Exception($"{nameof(Command)} not found");
}
private readonly ConcurrentQueue<string> PrintQueue = new();
private readonly ConcurrentQueue<string> ErrorQueue = new();
// Called every frame. 'delta' is the elapsed time since the previous frame.
public override void _Process(double delta)
{
if (PrintQueue.TryDequeue(out string s))
GD.Print(s);
if (ErrorQueue.TryDequeue(out string e))
GD.PrintErr(e);
}
public async Task ConnectAsync()
{
GD.Print("Connecting");
if (Socket.State == WebSocketState.Open)
return;
await Socket.ConnectAsync(new Uri("wss://irc-ws.chat.twitch.tv:443"), Token);
@@ -40,15 +53,22 @@ public partial class TwitchChatWatcher : Node
}
public async Task Authenticate(string user = null, string pass = null)
{
GD.Print("Authenticating");
user ??= $"justinfan{Random.Shared.NextInt64(10000):D4}";
pass ??= "pass";
await SendMessageAsync(TwitchChatMessageType.PASS, parameters: new string[] { pass });
await SendMessageAsync(TwitchChatMessageType.NICK, parameters: new string[] { user });
}
public async Task RequestTags()
{
await SendMessageAsync("CAP REQ :twitch.tv/tags");
}
public async Task JoinChannel(string channel)
{
GD.Print("Joining channel");
channel = channel.TrimStart('#');
await SendMessageAsync(TwitchChatMessageType.JOIN, parameters: new string[] {"#" + channel});
await SendMessageAsync(TwitchChatMessageType.JOIN,
parameters: new string[] {"#" + channel});
}
public async Task SendMessageAsync(string message)
{
@@ -58,7 +78,7 @@ public partial class TwitchChatWatcher : Node
public async Task SendMessageAsync(TwitchChatMessageType command, IEnumerable<string> parameters = null,
IDictionary<string, string> tags = null, string prefix = null)
{
string EscapeTagValue(string s)
static string EscapeTagValue(string s)
{
if (s is null)
return "";
@@ -89,41 +109,103 @@ public partial class TwitchChatWatcher : Node
}
await SendMessageAsync(message);
}
private static ulong PacketCount;
private async Task GetPacketsTask()
{
var buffer = ArraySegment<byte>.Empty;
var stringData = "";
while (!Token.IsCancellationRequested)
try
{
var res = await Socket.ReceiveAsync(buffer, Token);
if (Token.IsCancellationRequested)
return;
stringData += Encoding.UTF8.GetString(buffer);
var lines = stringData.Split("\r\n", StringSplitOptions.TrimEntries);
stringData = lines.Last();
foreach (var line in lines.SkipLast(1))
MessageStrings.Enqueue(line);
var arr = new byte[16 * 1024];
var stringData = "";
while (!Token.IsCancellationRequested)
{
var res = await Socket.ReceiveAsync(arr, Token);
if (Token.IsCancellationRequested)
return;
if (Socket.State != WebSocketState.Open)
{
ErrorQueue.Enqueue("Socket closed");
return;
}
if (res.Count == 0)
{
ErrorQueue.Enqueue("Empty packet received");
continue;
}
PacketCount++;
PrintQueue.Enqueue($"Packet count: {PacketCount}");
stringData += Encoding.UTF8.GetString(arr, 0, res.Count);
//PrintQueue.Enqueue(stringData);
var lines = stringData.Split("\r\n", StringSplitOptions.TrimEntries);
if (!lines.Any())
continue;
stringData = lines.Last();
PrintQueue.Enqueue($"Line count: {lines.SkipLast(1).Count()}");
foreach (var line in lines.SkipLast(1))
MessageStrings.Enqueue(line);
}
}
catch (Exception e)
{
ErrorQueue.Enqueue(e.ToString());
}
finally
{
if (!Token.IsCancellationRequested)
ErrorQueue.Enqueue($"{nameof(GetPacketsTask)} exited without cancellation");
else
PrintQueue.Enqueue($"{nameof(GetPacketsTask)} cancelled and exited");
}
}
private readonly ConcurrentQueue<string> MessageStrings = new();
private void HandleMessages()
private async Task HandleMessages()
{
while (MessageStrings.TryDequeue(out string message))
try
{
var tcm = TwitchChatMessage.Parse(message);
if (tcm.MessageType == TwitchChatMessageType.PING)
_ = SendPong(tcm);
else if (tcm is Privmsg p)
while (!Token.IsCancellationRequested)
{
EmitSignal(SignalName.IncomingCommand, new Command(p.DisplayName,
false, p.Moderator, p.ChatMessage));
while (MessageStrings.TryDequeue(out string message))
{
if (string.IsNullOrWhiteSpace(message))
continue;
PrintQueue.Enqueue(message);
// if (PrintAllIncoming)
// PrintQueue.Enqueue(message);
var tcm = TwitchChatMessage.Parse(message);
if (tcm.MessageType == TwitchChatMessageType.PING)
_ = Task.Run(() => SendPong(tcm), Token);
else if (tcm is Privmsg p)
{
var com = Settings.Command;
if (!p.ChatMessage.StartsWith(com))
continue;
var chat = p.ChatMessage;
chat = chat[com.Length..].TrimStart();
CallDeferred("emit_signal", SignalName.IncomingCommand,
new Command(p.DisplayName,
false, p.Moderator, chat));
}
}
await Task.Delay(50);
}
}
catch (Exception e)
{
ErrorQueue.Enqueue(e.ToString() + System.Environment.NewLine
+ e.StackTrace);
}
finally
{
if (!Token.IsCancellationRequested)
ErrorQueue.Enqueue($"{nameof(HandleMessages)} exited without cancellation");
else
ErrorQueue.Enqueue($"{nameof(HandleMessages)} cancelled and exited");
}
}
private async Task SendPong(TwitchChatMessage ping)
{
var pong = TwitchChatMessage.MakePong(ping);
await SendMessageAsync(TwitchChatMessageType.PONG, ping.Parameters,
ping.MessageTags, ping.Prefix);
PrintQueue.Enqueue("Sent Pong");
}
}