Source Code
You can find the fully functional app with .NET Aspire — including Redis, Avalonia, three ASP.NET Core load balanced apps, and a bonus SignalR with SignalR backplane (yes, it’s not a typo!) — in the R3Async Playground repository.
Intro
First of all, I’d like to ask a question: which API do you prefer?
void InvokeSomeApi(Request request, Action<Response> onCompleted);
Task<Response> InvokeSomeApiAsync(Request request, CancellationToken cancellationToken);
Of course, you chose the second one.
- We have built-in language support with
await. - No need to deal with callbacks manually.
- A standard, built-in way to support cancellation.
- A standard way to control continuation behavior (
TaskAwaiter). -
Taskalready solves this problem in a well-established way — we shouldn’t reinvent it every time.
Second question: which one do you prefer?
record Notification;
interface IConsumer
{
public Task OnNotification(Notification notification, CancellationToken cancellationToken);
}
interface IProducer
{
void Subscribe(IConsumer client);
void Unsubscribe(IConsumer client);
}
record Notification;
interface IProducer
{
IAsyncEnumerable<Notification> GetNotificationStream();
}
I would have no doubts:
- We have built-in language support (
await foreach). - We have a standard way to handle unsubscription via
.WithCancellation(cancellationToken). - We get LINQ support (in .NET 10 and later).
- We don’t need to introduce an explicit
IConsumerinterface.
Final question: So why is the first approach still the default for many .NET developers using SignalR?
I think it often comes down to a lack of familiarity with reactive patterns.
Do you even think about enumerating a list without using an IEnumerable? That would be crazy — we would be reinventing enumerables every time.
So why on earth are we reinventing streams over and over again?
We already have basically three stream patterns in .NET:
-
IAsyncEnumerable<T>: Pull-based async stream. Works naturally withawait foreach. -
Observable<T>: Push-based synchronous stream. see R3 -
AsyncObservable<T>: Push-based asynchronous stream. See R3Async
Create a (Reactive) Realtime Messaging App with SignalR
So, how can we get started? SignalR supports streaming.
All we need to do is create a hub and return a stream, like this:
public class AppHub(IChatService chatService) : Hub
{
// Returns a stream of messages for the specified ChatRoomId
public async IAsyncEnumerable<ChatMessage> GetChatMessages(
ChatRoomId roomId,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var message in todo.WithCancellation(cancellationToken))
{
yield return message;
}
}
// Joins a room and broadcasts messages sent by the client to other users
public async Task JoinRoom(
ChatRoomId roomId,
string user,
IAsyncEnumerable<ChatMessage> messages,
CancellationToken cancellationToken)
{
await foreach (var message in messages.WithCancellation(cancellationToken))
{
// TODO: broadcast message to other users
}
}
}
Here, we’re omitting how the todo stream is implemented — the focus is on how SignalR handles the streaming endpoint.
- The
CancellationTokenis tied to the client connection (or more accurately, to the client-side enumeration and connection). -
yield returnwill push data into the socket automatically.
In this way, we’re essentially abstracting away SignalR — the client just consumes a standard IAsyncEnumerable.
Client Implementation with Avalonia
For the client, we’re using Avalonia.
Our goals:
- Share a single SignalR connection across the entire app.
- Dynamically create and dispose of the connection as needed.
- Keep track of usage to know when to dispose — basically a reference counting system.
A good solution is to use RefCountLazy<T> from R3Async, which lets us lazily create the connection and automatically dispose it when no one is using it anymore.
public sealed class SignalRHubClient
{
readonly RefCountLazy<HubConnection> _sharedConnection;
public SignalRHubClient()
{
_sharedConnection = new(async token =>
{
var connection = new HubConnectionBuilder()
.WithUrl("http://localhost:5062/hub")
.Build();
await connection.StartAsync(token);
_isConnected.Value = true;
return new()
{
Value = connection,
Disposable = AsyncDisposable.Create(async () =>
{
await connection.DisposeAsync();
_isConnected.Value = false;
})
};
});
}
readonly Signal<bool> _isConnected = new(false);
public ISignal<bool> IsConnected => _isConnected;
public async IAsyncEnumerable<ChatMessage> GetChatMessages(ChatRoomId roomId, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var hubRef = await _sharedConnection.GetAsync(cancellationToken);
var connection = hubRef.Value;
var stream = connection.StreamAsync<ChatMessage>("GetChatMessages", roomId, cancellationToken);
await foreach (var message in stream)
{
yield return message;
}
}
public async ValueTask JoinRoom(ChatRoomId roomId, string user, IAsyncEnumerable<ChatMessage> messages, CancellationToken cancellationToken = default)
{
await using var hubRef = await _sharedConnection.GetAsync(cancellationToken);
await hubRef.Value.InvokeAsync("JoinRoom", roomId, user, messages, cancellationToken: cancellationToken);
}
}
Basically, RefCountLazy<T> is a thread-safe helper that tracks references to our Hub connection.
-
GetAsync()returns a "disposable reference" to the connection. - Calling
DisposeAsync()on that reference decrements the reference count. - When the reference count reaches 0, the connection is automatically disposed.
- When the reference count goes from 0 to 1, the connection is created again — and so on.
Effectively, we are moving the IAsyncEnumerable from the server to the client!
ViewModel Implementation
Here we use R3 and SignalsDotnet to automatically subscribe and unsubscribe to IAsyncEnumerable streams based on property changes.
public class ChatViewModel
{
readonly SignalRHubClient _hubClient = new();
public ChatViewModel(IReadOnlySignal<string> userName)
{
var computedFactory = ComputedSignalFactory.Default
.DisconnectEverythingWhen(Disconnected.Values);
// Command to send messages
SendCommand = computedFactory
.ComputedObservable(
() => _hubClient.IsConnected.Value && !string.IsNullOrWhiteSpace(DraftMessage.Value),
() => new(false))
.ToReactiveCommand(
_ => ToSendMessage.OnNext(new(userName.Value, DraftMessage.Value))
);
// Subscribe to incoming chat messages
computedFactory.AsyncEffect(async token =>
{
await foreach (var message in _hubClient.GetChatMessages(new(RoomName.Value), token))
{
AllChatMessages.Add(message);
}
}, ConcurrentChangeStrategy.CancelCurrent);
// Join the chat room and broadcast outgoing messages
computedFactory.AsyncEffect(async token =>
{
await _hubClient.JoinRoom(
new(RoomName.Value),
userName.Value,
ToSendMessage.TakeUntil(token).ToAsyncEnumerable(),
token
);
}, ConcurrentChangeStrategy.CancelCurrent);
// Clear draft message after sending
ToSendMessage.Subscribe(_ => DraftMessage.Value = "");
}
public Signal<bool> Disconnected { get; } = new(true);
public Signal<string> DraftMessage { get; } = new();
public Subject<ChatMessage> ToSendMessage { get; } = new();
public Signal<string> RoomName { get; } = new("MyRoom");
public ObservableCollection<ChatMessage> AllChatMessages { get; } = new();
public ICommand SendCommand { get; }
}
See what’s happening here? We’re basically just enumerating an IAsyncEnumerable and adding items to a list — it really doesn’t get simpler than that.
SignalsDotnet takes care of all the dependency tracking for us:
- Whenever
RoomName.Valuechanges, theCancelCurrentstrategy cancels the original stream. - This cancellation is automatically propagated to the server.
- A new stream is then enumerated for the updated room — all automatically.
- Everything is disposed automatically if the view is unloaded
Ok, but how can we implement the backend?
Let’s go step by step.
First assumption: We have a single backend service — every client connects directly to this server.
Later, we can scale out by implementing a Redis backplane (we won’t use the built-in SignalR backplane for this, but we implement it from scratch) to synchronize messages across multiple server instances.
Understand What We Need
So, what do we actually need?
We need a way to publish messages.
Don’t worry about the underlying technology yet — focus only on the method signature and how the API should look.
So we need something like this:
public interface IPublisher<T>
{
IAsyncDisposable SubscribeAsync(Func<T, CancellationToken, ValueTask> onNextAsync);
ValueTask PublishAsync(T value, CancellationToken cancellationToken);
}
Here’s the fun part: stop trying to implement this interface yourself.
This problem is already solved and has a well-known name in the ReactiveX world — it’s called a Subject.
All we need is the async version of it, and we can use R3Async to get exactly that.
So, what we really need is an ISubject.
But is this enough? Not quite.
- We need a subject per topic, so it’s more like:
ConcurrentDictionary<string, ISubject<T>> _publishersByTopic;
- Are we good now? Still not. We want the publisher to exist only if someone is subscribed and to dispose automatically when no one needs it. We don't want the dictionary to have zombie-references
That means we need reference counting again, but this time based on a key (the topic). We can use RefCountTable for this! It's basically a concurrent dictionary with only GetOrAddAsyncmethod, that counts how many active references there are.
That means we need a
readonly RefCountTable<ChatRoomId, ISubject<ChatMessage>> _roomPublisherByRoomId;
Maybe we can hide it behind an interface like this:
public interface IChatService
{
ValueTask<IAsyncDisposableReference<ISubject<ChatMessage>>> GetOrCreateChatRoom(ChatRoomId id, CancellationToken cancellationToken);
}
Here, IAsyncDisposableReference is essentially the ref-counted reference.
Calling DisposeAsync() on it signals that we no longer need the publisher, and when the ref count reaches 0, it will automatically dispose the underlying implementation that exposes the subject.
Completing the hub
We are now ready to complete the Hub implementation:
public class AppHub(IChatService chatService) : Hub
{
public async IAsyncEnumerable<ChatMessage> GetChatMessages(ChatRoomId roomId, [EnumeratorCancellation] CancellationToken cancellationToken)
{
await using var chatRef = await chatService.GetOrCreateChatRoom(roomId, cancellationToken);
var chatMessages = chatRef.Value;
await foreach (var message in chatMessages.Values
.ToAsyncEnumerable(static () => Channel.CreateUnbounded<ChatMessage>())
.WithCancellation(cancellationToken))
{
yield return message;
}
}
public async Task JoinRoom(ChatRoomId roomId, string user, IAsyncEnumerable<ChatMessage> messages)
{
var cancellationToken = Context.ConnectionAborted;
await using var chatRef = await chatService.GetOrCreateChatRoom(roomId, cancellationToken);
var chatMessages = chatRef.Value;
await chatMessages.OnNextAsync(new ChatMessage(user, $"{user} joined the room"), cancellationToken);
try
{
await foreach (var message in messages.WithCancellation(cancellationToken))
{
await chatMessages.OnNextAsync(message, cancellationToken);
}
}
finally
{
await chatMessages.OnNextAsync(new ChatMessage(user, $"{user} left the room"), cancellationToken);
}
}
}
How it works
- JoinRoom will return only when the user has left the room.
- Every message from the client is published to the subject.
- GetChatMessages enumerates every notification from that subject and yields it to the client in real time.
- References from
RefCountTableare easy to manage usingawait usingstatements. They automatically track the lifetime of both publishers and consumers. - Note that we can move all this method outside the hub class, since we are not depending from SignalR anywhere. That means that we can easily unit test the logic also
Ok, but how is IChatService implemented?
Here’s the first implementation: InMemory
public class InMemoryChatService : IChatService
{
readonly RefCountTable<ChatRoomId, ISubject<ChatMessage>> _chatByRoom;
public InMemoryChatService()
{
_chatByRoom = new(async (roomId, cancellationToken) =>
{
var chat = Subject.Create<ChatMessage>(new SubjectCreationOptions
{
IsStateless = false,
PublishingOption = PublishingOption.Concurrent
});
return new()
{
Value = chat,
Disposable = AsyncDisposable.Create(() => chat.OnCompletedAsync(Result.Success))
};
});
}
public ValueTask<IAsyncDisposableReference<ISubject<ChatMessage>>> GetOrCreateChatRoom(ChatRoomId id, CancellationToken cancellationToken)
{
return _chatByRoom.GetOrCreateAsync(id, cancellationToken);
}
}
Ok, this is good and all, but how do you scale it out?
Just give another implementation of the subject (publisher)!
Let's use redis as backplane
sealed class RedisSubscriberSubject<T> : ISubject<T> where T : class
{
readonly ISubscriber _subscriber;
readonly RedisChannel _channel;
public RedisSubscriberSubject(ISubscriber subscriber,
RedisChannel channel)
{
_subscriber = subscriber;
_channel = channel;
Values = AsyncObservable.Create<T>((observer, _) =>
{
var messageQueue = subscriber.Subscribe(_channel);
messageQueue.OnMessage(async message =>
{
var notification = JsonSerializer.Deserialize<Notification>((string)message.Message!)!;
await notification.ForwardTo(observer, CancellationToken.None);
});
var disposable = AsyncDisposable.Create(async () => await messageQueue.UnsubscribeAsync());
return new(disposable);
});
}
public AsyncObservable<T> Values { get; }
public ValueTask OnNextAsync(T value, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnNext(value));
public ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnErrorResume(error));
public ValueTask OnCompletedAsync(Result result) => ForwardNotification(Notification.FromOnCompleted(result));
async ValueTask ForwardNotification(Notification notification)
{
var json = JsonSerializer.Serialize(notification);
await _subscriber.PublishAsync(_channel, json);
}
record Notification
{
public static Notification FromOnNext(T value) => new()
{
Value = value,
ErrorMessage = null,
IsCompleted = false
};
public static Notification FromOnErrorResume(Exception exception) => new()
{
Value = null,
ErrorMessage = exception.Message,
IsCompleted = false
};
public static Notification FromOnCompleted(Result result) => new()
{
Value = null,
ErrorMessage = result.Exception?.Message,
IsCompleted = true
};
public string? ErrorMessage { get; init; }
public T? Value { get; init; }
public bool IsCompleted { get; init; }
public ValueTask ForwardTo(AsyncObserver<T> observer, CancellationToken cancellationToken) => (IsCompleted, ErrorMessage) switch
{
(false, null) => observer.OnNextAsync(Value!, cancellationToken),
(false, not null) => observer.OnErrorResumeAsync(new(ErrorMessage), cancellationToken),
(true, null) => observer.OnCompletedAsync(Result.Success),
(true, not null) => observer.OnCompletedAsync(Result.Failure(new(ErrorMessage)))
};
}
}
public static class RedisSubscriberEx
{
public static ISubject<T> ToSubject<T>(this ISubscriber subscriber, RedisChannel channel) where T : class => new RedisSubscriberSubject<T>(subscriber, channel);
}
the Important bits are
AsyncObservable.Create<T>((observer, _) =>
{
var messageQueue = subscriber.Subscribe(_channel);
messageQueue.OnMessage(async message =>
{
var notification = JsonSerializer.Deserialize<Notification>((string)message.Message!)!;
await notification.ForwardTo(observer, CancellationToken.None);
});
var disposable = AsyncDisposable.Create(async () => await messageQueue.UnsubscribeAsync());
return new(disposable);
});
Here we are mapping the redis subscription to an AsyncObservable.
For publishing to Redis:
public ValueTask OnNextAsync(T value, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnNext(value));
public ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnErrorResume(error));
public ValueTask OnCompletedAsync(Result result) => ForwardNotification(Notification.FromOnCompleted(result));
async ValueTask ForwardNotification(Notification notification)
{
var json = JsonSerializer.Serialize(notification);
await _subscriber.PublishAsync(_channel, json);
}
Now we can implement our RedisChatService like this:
public class RedisChatService : IChatService
{
readonly RefCountTable<ChatRoomId, ISubject<ChatMessage>> _table;
public RedisChatService(IConnectionMultiplexer connectionMultiplexer)
{
_table = new(async (roomId, _) =>
{
var subscriber = connectionMultiplexer.GetSubscriber();
var disposable = AsyncDisposable.Create(async () => await subscriber.UnsubscribeAllAsync());
try
{
var subject = subscriber.ToSubject<ChatMessage>(new(roomId.Name, RedisChannel.PatternMode.Auto));
return new AsyncDisposableValue<ISubject<ChatMessage>>
{
Value = subject,
Disposable = disposable
};
}
catch
{
await disposable.DisposeAsync();
throw;
}
});
}
public ValueTask<IAsyncDisposableReference<ISubject<ChatMessage>>> GetOrCreateChatRoom(ChatRoomId id, CancellationToken cancellationToken)
{
return _table.GetOrCreateAsync(id, cancellationToken);
}
}
As you can see, if we combine the right tools, we can achieve great things with very little effort.
Top comments (0)