Compare commits

...

3 Commits

Author SHA1 Message Date
Cameron
bda8536464 PubSub that I couldn't get to work even with a validated token. 2024-04-02 18:18:26 -05:00
Cameron
4806e50736 Added locking to Irc Callback lists 2024-03-31 19:09:12 -05:00
Cameron
1bf8afc68b Added ability to construct IRC messages from parts when sending them. 2024-03-25 05:06:51 -05:00
7 changed files with 614 additions and 32 deletions

View File

@@ -139,19 +139,68 @@ namespace TwitchIrcClient.IRC
var bytes = Encoding.UTF8.GetBytes(line + ENDL);
_Stream.Write(bytes, 0, bytes.Length);
}
//TODO make this unit testable?
/// <summary>
/// Construct an IRC message from parts and sends it. Does little to no validation on inputs.
/// </summary>
/// <param name="command"></param>
/// <param name="parameters"></param>
/// <param name="tags"></param>
/// <param name="prefix"></param>
public void SendMessage(IrcMessageType command, IEnumerable<string>? parameters = null,
Dictionary<string, string?>? tags = null, string? prefix = null)
{
var message = "";
if (tags is not null && tags.Count != 0)
{
message = "@" + string.Join(';',
tags.OrderBy(p => p.Key).Select(p => $"{p.Key}={EscapeTagValue(p.Value)}"))
+ " ";
}
if (prefix is not null && !string.IsNullOrWhiteSpace(prefix))
message += ":" + prefix + " ";
message += command.ToCommand() + " ";
if (parameters is not null && parameters.Any())
{
//if ((command == IrcMessageType.NICK || command == IrcMessageType.PASS)
// && parameters.Count() == 1)
if (false)
{
message += " " + parameters.Single();
}
else
{
message += string.Join(' ', parameters.SkipLast(1));
message += " :" + parameters.Last();
}
}
SendLine(message);
}
private static string EscapeTagValue(string? s)
{
if (s is null)
return "";
return string.Join("", s.Select(c => c switch
{
';' => @"\:",
' ' => @"\s",
'\\' => @"\\",
'\r' => @"\r",
'\n' => @"\n",
char ch => ch.ToString(),
}));
}
public void Authenticate(string? user, string? pass)
{
if (user == null)
user = $"justinfan{Random.Shared.NextInt64(10000):D4}";
if (pass == null)
pass = "pass";
SendLine($"NICK {user}");
SendLine($"PASS {pass}");
user ??= $"justinfan{Random.Shared.NextInt64(10000):D4}";
pass ??= "pass";
SendMessage(IrcMessageType.PASS, parameters: [pass]);
SendMessage(IrcMessageType.NICK, parameters: [user]);
}
public void JoinChannel(string channel)
{
channel = channel.TrimStart('#');
SendLine($"JOIN #{channel}");
SendMessage(IrcMessageType.JOIN, ["#" + channel]);
}
private async void ListenForInput()
{
@@ -268,21 +317,25 @@ namespace TwitchIrcClient.IRC
public void AddCallback(MessageCallbackItem callbackItem)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (UserCallbacks)
UserCallbacks.Add(callbackItem);
}
public bool RemoveCallback(MessageCallbackItem callbackItem)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (UserCallbacks)
return UserCallbacks.Remove(callbackItem);
}
protected void AddSystemCallback(MessageCallbackItem callbackItem)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (SystemCallbacks)
SystemCallbacks.Add(callbackItem);
}
protected bool RemoveSystemCallback(MessageCallbackItem callbackItem)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (SystemCallbacks)
return SystemCallbacks.Remove(callbackItem);
}
private void RunCallbacks(ReceivedMessage message)
@@ -290,7 +343,9 @@ namespace TwitchIrcClient.IRC
ArgumentNullException.ThrowIfNull(message, nameof(message));
if (disposedValue)
return;
lock (SystemCallbacks)
SystemCallbacks.ForEach(c => c.TryCall(this, message));
lock (UserCallbacks)
UserCallbacks.ForEach(c => c.TryCall(this, message));
}
@@ -306,6 +361,7 @@ namespace TwitchIrcClient.IRC
TokenSource.Dispose();
Client?.Dispose();
_HeartbeatTimer?.Dispose();
_Stream?.Dispose();
}
disposedValue = true;
}

View File

@@ -6,6 +6,9 @@ using System.Threading.Tasks;
namespace TwitchIrcClient.IRC
{
/// <summary>
/// Represents the "command" of an IRC message.
/// </summary>
public enum IrcMessageType
{
//twitch standard messages
@@ -174,12 +177,26 @@ namespace TwitchIrcClient.IRC
}
public static class IrcMessageTypeHelper
{
//parses a string that is either a numeric code or the command name
/// <summary>
/// Parses a string that is either a numeric code or the command name.
/// </summary>
/// <param name="s"></param>
/// <returns></returns>
/// <remarks>
/// The value range 000-999 is reserved for numeric commands, and will
/// be converted to a numeric string when forming a message.
/// </remarks>
public static IrcMessageType Parse(string s)
{
if (int.TryParse(s, out int result))
return (IrcMessageType)result;
return Enum.Parse<IrcMessageType>(s);
}
public static string ToCommand(this IrcMessageType type)
{
if ((int)type >= 0 && (int)type < 1000)
return $"{(int)type,3}";
return type.ToString();
}
}
}

View File

@@ -9,11 +9,9 @@ RateLimiter limiter = new(20, 30);
bool ssl = true;
async Task<IrcConnection> CreateConnection(string channel)
{
IrcConnection connection;
if (ssl)
connection = new IrcConnection("irc.chat.twitch.tv", 6697, limiter, true, true);
else
connection = new IrcConnection("irc.chat.twitch.tv", 6667, limiter, true, false);
IrcConnection connection = ssl
? connection = new IrcConnection("irc.chat.twitch.tv", 6697, limiter, true, true)
: connection = new IrcConnection("irc.chat.twitch.tv", 6667, limiter, true, false);
connection.AddCallback(new MessageCallbackItem(
(o, m) =>
{
@@ -51,7 +49,7 @@ async Task<IrcConnection> CreateConnection(string channel)
}
Console.Write("Channel: ");
var channelName = Console.ReadLine();
ArgumentNullException.ThrowIfNull(channelName, nameof(Channel));
ArgumentNullException.ThrowIfNullOrWhiteSpace(channelName, nameof(channelName));
var connection = await CreateConnection(channelName);
while (true)
{

View File

@@ -0,0 +1,133 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading.Tasks;
using System.Xml.Linq;
namespace TwitchIrcClient.PubSub.Message
{
public class PubSubMessage : IDictionary<string, JsonNode?>
{
private readonly JsonObject Node;
public string TypeString
{
get => (Node["type"] ?? throw new InvalidDataException()).ToJsonString();
set
{
Node["type"] = value;
}
}
//PING and PONG messages don't seem to have any data member
public string? DataString =>
Node["data"]?.ToJsonString();
public string? Nonce
{
get => Node["nonce"]?.ToJsonString();
set
{
Node["nonce"] = value;
}
}
private PubSubMessage(JsonObject node)
{
Node = node;
}
public PubSubMessage() : this(new JsonObject())
{
}
public string Serialize()
{
return Node.ToJsonString();
}
public static PubSubMessage Parse(string s)
{
var obj = JsonNode.Parse(s)
?? throw new InvalidDataException();
var psm = new PubSubMessage(obj as JsonObject
?? throw new InvalidOperationException());
return psm;
}
public static PubSubMessage PING()
{
return new PubSubMessage
{
["type"] = "PING",
};
}
#region IDictionary<string, JsonNode?>
public ICollection<string> Keys => ((IDictionary<string, JsonNode?>)Node).Keys;
public ICollection<JsonNode?> Values => ((IDictionary<string, JsonNode?>)Node).Values;
public int Count => Node.Count;
public bool IsReadOnly => ((ICollection<KeyValuePair<string, JsonNode?>>)Node).IsReadOnly;
public JsonNode? this[string key] { get => ((IDictionary<string, JsonNode?>)Node)[key]; set => ((IDictionary<string, JsonNode?>)Node)[key] = value; }
public void Add(string key, JsonNode? value)
{
Node.Add(key, value);
}
public bool ContainsKey(string key)
{
return Node.ContainsKey(key);
}
public bool Remove(string key)
{
return Node.Remove(key);
}
public bool TryGetValue(string key, [MaybeNullWhen(false)] out JsonNode? value)
{
return ((IDictionary<string, JsonNode?>)Node).TryGetValue(key, out value);
}
public void Add(KeyValuePair<string, JsonNode?> item)
{
Node.Add(item);
}
public void Clear()
{
Node.Clear();
}
public bool Contains(KeyValuePair<string, JsonNode?> item)
{
return ((ICollection<KeyValuePair<string, JsonNode?>>)Node).Contains(item);
}
public void CopyTo(KeyValuePair<string, JsonNode?>[] array, int arrayIndex)
{
((ICollection<KeyValuePair<string, JsonNode?>>)Node).CopyTo(array, arrayIndex);
}
public bool Remove(KeyValuePair<string, JsonNode?> item)
{
return ((ICollection<KeyValuePair<string, JsonNode?>>)Node).Remove(item);
}
public IEnumerator<KeyValuePair<string, JsonNode?>> GetEnumerator()
{
return Node.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)Node).GetEnumerator();
}
#endregion //IDictionary<string, JsonNode?>
}
}

View File

@@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TwitchIrcClient.PubSub.Message;
namespace TwitchIrcClient.PubSub
{
public delegate void PubSubCallback(PubSubMessage message, PubSubConnection connection);
public record struct PubSubCallbackItem(PubSubCallback Callback, IList<string>? Types)
{
public readonly bool MaybeRunCallback(PubSubMessage message, PubSubConnection connection)
{
if (Types is null || Types.Contains(message.TypeString))
{
Callback?.Invoke(message, connection);
return true;
}
return false;
}
}
}

View File

@@ -0,0 +1,355 @@
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http.Headers;
using System.Net.Security;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Reflection.Metadata.Ecma335;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading.Tasks;
using System.Web;
using TwitchIrcClient.PubSub.Message;
namespace TwitchIrcClient.PubSub
{
public sealed class PubSubConnection : IDisposable
{
//private TcpClient Client = new();
//private SslStream SslStream;
//private WebSocket Socket;
private ClientWebSocket Socket = new();
private CancellationTokenSource TokenSource = new();
private string? ClientId;
private string? AuthToken;
private DateTime? AuthExpiration;
public string RefreshToken { get; private set; }
public PubSubConnection()
{
}
//this needs to be locked for thread-safety
public async Task SendMessageAsync(string message)
{
await Socket.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text,
WebSocketMessageFlags.EndOfMessage | WebSocketMessageFlags.DisableCompression,
TokenSource.Token);
}
public async Task SendMessageAsync(PubSubMessage message)
{
await SendMessageAsync(message.Serialize());
}
public async Task<bool> ConnectAsync()
{
const string url = "wss://pubsub-edge.twitch.tv";
await Socket.ConnectAsync(new Uri(url), TokenSource.Token);
if (Socket.State != WebSocketState.Open)
return false;
_ = Task.Run(HandlePings, TokenSource.Token);
_ = Task.Run(HandleIncomingMessages, TokenSource.Token);
return true;
}
public async Task<bool> GetImplicitTokenAsync(string clientId, string clientSecret,
IEnumerable<string> scopes)
{
const int PORT = 17563;
using var listener = new TcpListener(System.Net.IPAddress.Any, PORT);
listener.Start();
//using var client = new HttpClient();
var scopeString = string.Join(' ', scopes);
var stateNonce = MakeNonce();
var url = $"https://id.twitch.tv/oauth2/authorize" +
$"?response_type=code" +
$"&client_id={HttpUtility.UrlEncode(clientId)}" +
$"&redirect_uri=http://localhost:{PORT}" +
$"&scope={HttpUtility.UrlEncode(scopeString)}" +
$"&state={stateNonce}";
var startInfo = new ProcessStartInfo()
{
//FileName = "explorer",
//Arguments = url,
FileName = url,
UseShellExecute = true,
};
Process.Start(startInfo);
//Console.WriteLine(url);
using var socket = await listener.AcceptSocketAsync(TokenSource.Token);
var arr = new byte[2048];
var buffer = new ArraySegment<byte>(arr);
var count = await socket.ReceiveAsync(buffer, TokenSource.Token);
var http204 = "HTTP/1.1 204 No Content\r\n\r\n";
var sentCount = await socket.SendAsync(Encoding.UTF8.GetBytes(http204));
var resp = Encoding.UTF8.GetString(arr, 0, count);
var dict =
//get the first line of HTTP response
HttpUtility.UrlDecode(resp.Split("\r\n").First()
//extract location component (trim leading /?)
.Split(' ').ElementAt(1)[2..])
//make a dictionary
.Split('&').Select(s =>
{
var p = s.Split('=');
return new KeyValuePair<string, string>(p[0], p[1]);
}).ToDictionary();
if (dict["state"] != stateNonce)
return false;
var payload = DictToBody(new Dictionary<string,string>
{
["client_id"] = clientId,
["client_secret"] = clientSecret,
["code"] = dict["code"],
["grant_type"] = "authorization_code",
["redirect_uri"] = $"http://localhost:{PORT}",
});
var client = new HttpClient();
var startTime = DateTime.Now;
var httpResp = await client.PostAsync("https://id.twitch.tv/oauth2/token",
new StringContent(payload, new MediaTypeHeaderValue("application/x-www-form-urlencoded")));
if (httpResp is null)
return false;
if (httpResp.Content is null)
return false;
if (!httpResp.IsSuccessStatusCode)
return false;
var respStr = await httpResp.Content.ReadAsStringAsync();
var json = JsonNode.Parse(respStr);
string authToken;
double expiresIn;
string refreshToken;
if ((json?["access_token"]?.AsValue().TryGetValue(out authToken) ?? false)
&& (json?["expires_in"]?.AsValue().TryGetValue(out expiresIn) ?? false)
&& (json?["refresh_token"]?.AsValue().TryGetValue(out refreshToken) ?? false))
{
AuthToken = authToken;
RefreshToken = refreshToken;
AuthExpiration = startTime.AddSeconds(expiresIn);
ClientId = clientId;
return true;
}
return false;
}
private static string DictToBody(IEnumerable<KeyValuePair<string,string>> dict)
{
return string.Join('&', dict.Select(p =>
HttpUtility.UrlEncode(p.Key) + '=' + HttpUtility.UrlEncode(p.Value)));
}
public async Task<bool> GetDcfTokenAsync(string clientId, IEnumerable<string> scopes)
{
using var client = new HttpClient();
var scopeString = string.Join(',', scopes);
var startTime = DateTime.Now;
var resp = await client.PostAsync("https://id.twitch.tv/oauth2/device",
new StringContent($"client_id={clientId}&scopes={scopeString}",
MediaTypeHeaderValue.Parse("application/x-www-form-urlencoded")));
if (resp is null)
return false;
if (!resp.IsSuccessStatusCode)
return false;
if (resp.Content is null)
return false;
var contentString = await resp.Content.ReadAsStringAsync();
var json = JsonNode.Parse(contentString);
if (json is null)
return false;
throw new NotImplementedException();
}
public async Task<bool> GetTokenAsync(string clientId, string clientSecret)
{
using var client = new HttpClient();
var startTime = DateTime.Now;
var resp = await client.PostAsync($"https://id.twitch.tv/oauth2/token" +
$"?client_id={clientId}&client_secret={clientSecret}" +
$"&grant_type=client_credentials", null);
if (resp is null)
return false;
if (!resp.IsSuccessStatusCode)
return false;
if (resp.Content is null)
return false;
var json = JsonNode.Parse(await resp.Content.ReadAsStringAsync());
if (json is null)
return false;
var authToken = json["access_token"]?.GetValue<string>();
var expiresIn = json["expires_in"]?.GetValue<double>();
if (authToken is string token && expiresIn is double expires)
{
ClientId = clientId;
AuthToken = token;
AuthExpiration = startTime.AddSeconds(expires);
}
else
return false;
return true;
}
public async Task SubscribeAsync(IEnumerable<string> topics)
{
var psm = new PubSubMessage
{
["type"] = "LISTEN",
["data"] = new JsonObject
{
//TODO there's probably a cleaner way to do this
["topics"] = new JsonArray(topics.Select(t => (JsonValue)t).ToArray()),
["auth_token"] = AuthToken,
},
["nonce"] = MakeNonce(),
};
await SendMessageAsync(psm);
}
//TODO change or dupe this to get multiple at once
public async Task<string?> GetChannelIdFromNameAsync(string channelName)
{
using var client = new HttpClient();
client.DefaultRequestHeaders.Add("Authorization", $"Bearer {AuthToken}");
client.DefaultRequestHeaders.Add("Client-Id", ClientId);
var resp = await client.GetAsync($"https://api.twitch.tv/helix/users?login={channelName}");
if (resp is null)
return null;
if (!resp.IsSuccessStatusCode)
return null;
if (resp.Content is null)
return null;
var json = JsonNode.Parse(await resp.Content.ReadAsStringAsync());
if (json is null)
return null;
var arr = json["data"];
if (arr is null)
return null;
JsonArray jarr;
try
{
jarr = arr.AsArray();
}
catch (InvalidOperationException)
{
return null;
}
var item = jarr.SingleOrDefault();
if (item is null)
return null;
return item["id"]?.ToString();
}
private static string MakeNonce(int length = 16)
{
var buffer = new byte[length * 2];
Random.Shared.NextBytes(buffer);
return Convert.ToHexString(buffer);
}
private AutoResetEvent PingResetEvent = new(false);
private async Task HandlePings()
{
//send ping every <5 minutes
//wait until pong or >10 seconds
//raise error if necessary
AddSystemCallback(new PubSubCallbackItem(
(m, s) =>
{
s.PingResetEvent.Set();
}, ["PONG"]));
while (true)
{
await Task.Delay(TimeSpan.FromMinutes(4 * Jitter(0.05)));
await SendMessageAsync(PubSubMessage.PING());
await Task.Delay(TimeSpan.FromSeconds(10));
if (!PingResetEvent.WaitOne(0))
{
//timeout
}
}
}
private async void HandleIncomingMessages()
{
string s = "";
while (true)
{
var buffer = new ArraySegment<byte>(new byte[4096]);
var result = await Socket.ReceiveAsync(buffer, TokenSource.Token);
s += Encoding.UTF8.GetString(buffer.Take(result.Count).ToArray());
if (result.EndOfMessage)
{
IncomingMessage(PubSubMessage.Parse(s));
s = "";
}
}
}
private void IncomingMessage(PubSubMessage message)
{
RunCallbacks(message);
}
private void RunCallbacks(PubSubMessage message)
{
ArgumentNullException.ThrowIfNull(message);
if (disposedValue)
return;
lock (SystemCallbacks)
SystemCallbacks.ForEach(c => c.MaybeRunCallback(message, this));
lock (UserCallbacks)
UserCallbacks.ForEach(c => c.MaybeRunCallback(message, this));
}
private readonly List<PubSubCallbackItem> UserCallbacks = [];
public void AddCallback(PubSubCallbackItem callback)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (UserCallbacks)
UserCallbacks.Add(callback);
}
public bool RemoveCallback(PubSubCallbackItem callback)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (UserCallbacks)
return UserCallbacks.Remove(callback);
}
private readonly List<PubSubCallbackItem> SystemCallbacks = [];
private void AddSystemCallback(PubSubCallbackItem callback)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (SystemCallbacks)
SystemCallbacks.Add(callback);
}
private bool RemoveSystemCallback(PubSubCallbackItem callback)
{
ObjectDisposedException.ThrowIf(disposedValue, this);
lock (SystemCallbacks)
return SystemCallbacks.Remove(callback);
}
/// <summary>
/// produces a number between -limit and limit
/// </summary>
/// <param name="limit"></param>
/// <returns></returns>
private static double Jitter(double limit)
{
return (Random.Shared.NextDouble() - 0.5) * 2 * limit;
}
#region Dispose
private bool disposedValue;
private void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
//Client?.Dispose();
//SslStream?.Dispose();
Socket?.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
#endregion //Dispose
}
}