Now successfully connects, authenticates, joins a channel and parses messages.

This commit is contained in:
Ikatono
2024-03-18 05:55:25 -05:00
parent 7051cd28f1
commit 6a5d960b3d
5 changed files with 666 additions and 50 deletions

View File

@@ -2,8 +2,10 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Sockets;
using System.Reflection.Metadata;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -11,79 +13,72 @@ using System.Timers;
namespace TwitchLogger.IRC
{
public class IrcConnection : IDisposable
/// <summary>
/// Connects to a single Twitch chat channel via limited IRC implementation.
///
/// </summary>
/// <param name="url"></param>
/// <param name="port"></param>
public class IrcConnection(string url, int port) : IDisposable
{
public static readonly string ENDL = "\r\n";
public int Port { get; }
public string Url { get; }
public int Port { get; } = port;
public string Url { get; } = url;
public bool Connected { get; } = false;
public event EventHandler? onTimeout;
private Socket Socket = new(SocketType.Stream, ProtocolType.Tcp);
private CancellationTokenSource CancellationTokenSource = new();
private Thread? ListenerThread;
private TcpClient Client = new();
private NetworkStream Stream => Client.GetStream();
private CancellationTokenSource TokenSource = new();
private RateLimiter? Limiter;
private Task? ListenerTask;
public IrcConnection(string url, int port)
{
Url = url;
Port = port;
}
public async Task<bool> ConnectAsync()
{
if (Connected)
return true;
await Socket.ConnectAsync(Url, Port);
if (!Socket.Connected)
Client.NoDelay = true;
await Client.ConnectAsync(Url, Port);
if (!Client.Connected)
return false;
ListenerThread = new(() => ListenForInput(CancellationTokenSource.Token));
ListenerThread.Start();
ListenerTask = Task.Run(() => ListenForInput(TokenSource.Token), TokenSource.Token);
return true;
}
public void Disconnect()
{
CancellationTokenSource.Cancel();
throw new NotImplementedException();
TokenSource.Cancel();
}
public void SendLine(string line)
{
int sent = Socket.Send(Encoding.UTF8.GetBytes(line + ENDL));
Limiter?.WaitForAvailable();
if (TokenSource.IsCancellationRequested)
return;
Stream.Write(new Span<byte>(Encoding.UTF8.GetBytes(line + ENDL)));
}
public bool Authenticate(string user, string pass)
public void Authenticate(string user, string pass)
{
throw new NotImplementedException();
SendLine($"NICK {user}");
SendLine($"PASS {pass}");
}
private void ListenForInput(CancellationToken token)
private async void ListenForInput(CancellationToken token)
{
using AutoResetEvent ARE = new(false);
while (true)
byte[] buffer = new byte[5 * 1024];
while (!token.IsCancellationRequested)
{
SocketAsyncEventArgs args = new();
args.Completed += (sender, e) =>
{
onDataReceived(e);
ARE.Set();
};
bool started = Socket.ReceiveAsync(args);
while (true)
{
bool reset = ARE.WaitOne(100);
if (reset)
break;
//returning ends the thread running this
if (token.IsCancellationRequested)
return;
}
var bytesRead = await Stream.ReadAsync(buffer, 0, buffer.Length, TokenSource.Token);
if (bytesRead > 0)
onDataReceived(buffer, bytesRead);
if (!Stream.CanRead)
return;
}
token.ThrowIfCancellationRequested();
}
private string _ReceivedDataBuffer = "";
private void onDataReceived(SocketAsyncEventArgs args)
private void onDataReceived(byte[] buffer, int length)
{
if (args.SocketError != SocketError.Success)
throw new SocketException((int)args.SocketError, $"Socket Error: {args.SocketError}");
if (args.Buffer is null)
throw new ArgumentNullException();
string receivedString = Encoding.UTF8.GetString(args.Buffer, args.Offset, args.BytesTransferred);
string receivedString = Encoding.UTF8.GetString(buffer, 0, length);
_ReceivedDataBuffer += receivedString;
string[] lines = _ReceivedDataBuffer.Split(ENDL);
//if last line is terminated, there should be an empty string at the end of "lines"
@@ -93,34 +88,64 @@ namespace TwitchLogger.IRC
}
private void onLineReceived(string line)
{
throw new NotImplementedException();
if (string.IsNullOrWhiteSpace(line))
return;
var message = ReceivedMessage.Parse(line);
HeartbeatReceived();
//PONG must be sent automatically
if (message.MessageType == IrcMessageType.PING)
SendLine($"PONG :{message.Source} {message.RawParameters}");
RunCallbacks(message);
}
private System.Timers.Timer _HeartbeatTimer = new();
//TODO consider changing to a System.Threading.Timer, I'm not sure
//if it's a better fit
private readonly System.Timers.Timer _HeartbeatTimer = new();
private void InitializeHeartbeat(int millis)
{
ObjectDisposedException.ThrowIf(disposedValue, GetType());
_HeartbeatTimer.AutoReset = false;
_HeartbeatTimer.Interval = millis;
_HeartbeatTimer.Elapsed += HeartbeatTimedOut;
_HeartbeatTimer.Start();
}
private void HeartbeatReceived()
{
throw new NotImplementedException();
if (disposedValue)
return;
_HeartbeatTimer.Stop();
_HeartbeatTimer.Start();
}
private void HeartbeatTimedOut(object? caller, ElapsedEventArgs e)
{
onTimeout?.Invoke(this, new());
if (disposedValue)
return;
onTimeout?.Invoke(this, EventArgs.Empty);
}
private List<CallbackItem> Callbacks = [];
public void AddCallback(CallbackItem callbackItem)
=> Callbacks.Add(callbackItem);
public bool RemoveCallback(CallbackItem callbackItem)
=> Callbacks.Remove(callbackItem);
private void RunCallbacks(ReceivedMessage message)
{
ArgumentNullException.ThrowIfNull(message, nameof(message));
Callbacks.ForEach(c => c.TryCall(message));
}
#region Dispose
private bool disposedValue;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
CancellationTokenSource.Cancel();
TokenSource.Cancel();
if (disposing)
{
Socket?.Dispose();
TokenSource.Dispose();
Client?.Dispose();
_HeartbeatTimer?.Dispose();
Limiter?.Dispose();
}
disposedValue = true;
}
@@ -131,5 +156,99 @@ namespace TwitchLogger.IRC
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
#endregion //Dispose
private class RateLimiter : IDisposable
{
private SemaphoreSlim Semaphore;
private System.Timers.Timer Timer;
public int MessageLimit { get; }
public int Seconds { get; }
private CancellationToken Token { get; }
public RateLimiter(int messages, int seconds, CancellationToken token)
{
Semaphore = new(messages, messages);
Timer = new(TimeSpan.FromSeconds(seconds));
MessageLimit = messages;
Seconds = seconds;
Token = token;
Timer.AutoReset = true;
Timer.Elapsed += ResetLimit;
Timer.Start();
}
public void WaitForAvailable()
{
try
{
Semaphore.Wait(Token);
}
catch (OperationCanceledException)
{
//caller is responsible for checking whether connection is cancelled before trying to send
}
}
public bool WaitForAvailable(TimeSpan timeout)
{
try
{
return Semaphore.Wait(timeout, Token);
}
catch (OperationCanceledException)
{
return false;
}
}
public bool WaitForAvailable(int millis)
{
try
{
return Semaphore.Wait(millis, Token);
}
catch (OperationCanceledException)
{
return false;
}
}
private void ResetLimit(object? sender, EventArgs e)
{
try
{
Semaphore.Release(MessageLimit);
}
catch (SemaphoreFullException)
{
}
catch (ObjectDisposedException)
{
}
}
#region RateLimiter Dispose
private bool disposedValue;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
Semaphore?.Dispose();
Timer?.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
#endregion //RateLimiter Dispose
}
}
}