DEV Community

Cover image for Building Modern Realtime .NET Apps: SignalR, R3, AsyncR3, SignalsDotnet, and Redis
Federico Alterio
Federico Alterio

Posted on

Building Modern Realtime .NET Apps: SignalR, R3, AsyncR3, SignalsDotnet, and Redis

Source Code

You can find the fully functional app with .NET Aspire — including Redis, Avalonia, three ASP.NET Core load balanced apps, and a bonus SignalR with SignalR backplane (yes, it’s not a typo!) — in the R3Async Playground repository.

Intro

First of all, I’d like to ask a question: which API do you prefer?

void InvokeSomeApi(Request request, Action<Response> onCompleted);
Task<Response> InvokeSomeApiAsync(Request request, CancellationToken cancellationToken);
Enter fullscreen mode Exit fullscreen mode

Of course, you chose the second one.

  • We have built-in language support with await.
  • No need to deal with callbacks manually.
  • A standard, built-in way to support cancellation.
  • A standard way to control continuation behavior (TaskAwaiter).
  • Task already solves this problem in a well-established way — we shouldn’t reinvent it every time.

Second question: which one do you prefer?

record Notification;

interface IConsumer
{
    public Task OnNotification(Notification notification, CancellationToken cancellationToken);
}

interface IProducer
{
    void Subscribe(IConsumer client);
    void Unsubscribe(IConsumer client);
}
Enter fullscreen mode Exit fullscreen mode
record Notification;
interface IProducer
{
    IAsyncEnumerable<Notification> GetNotificationStream();
}
Enter fullscreen mode Exit fullscreen mode

I would have no doubts:

  • We have built-in language support (await foreach).
  • We have a standard way to handle unsubscription via .WithCancellation(cancellationToken).
  • We get LINQ support (in .NET 10 and later).
  • We don’t need to introduce an explicit IConsumer interface.

Final question: So why is the first approach still the default for many .NET developers using SignalR?

I think it often comes down to a lack of familiarity with reactive patterns.

Do you even think about enumerating a list without using an IEnumerable? That would be crazy — we would be reinventing enumerables every time.

So why on earth are we reinventing streams over and over again?

We already have basically three stream patterns in .NET:

  • IAsyncEnumerable<T>: Pull-based async stream. Works naturally with await foreach.
  • Observable<T>: Push-based synchronous stream. see R3
  • AsyncObservable<T>: Push-based asynchronous stream. See R3Async

Create a (Reactive) Realtime Messaging App with SignalR

So, how can we get started? SignalR supports streaming.

All we need to do is create a hub and return a stream, like this:

public class AppHub(IChatService chatService) : Hub
{
    // Returns a stream of messages for the specified ChatRoomId
    public async IAsyncEnumerable<ChatMessage> GetChatMessages(
        ChatRoomId roomId,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var message in todo.WithCancellation(cancellationToken))
        {
            yield return message;
        }
    }

    // Joins a room and broadcasts messages sent by the client to other users
    public async Task JoinRoom(
        ChatRoomId roomId,
        string user,
        IAsyncEnumerable<ChatMessage> messages,
        CancellationToken cancellationToken)
    {
        await foreach (var message in messages.WithCancellation(cancellationToken))
        {
            // TODO: broadcast message to other users
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Here, we’re omitting how the todo stream is implemented — the focus is on how SignalR handles the streaming endpoint.

  • The CancellationToken is tied to the client connection (or more accurately, to the client-side enumeration and connection).
  • yield return will push data into the socket automatically.

In this way, we’re essentially abstracting away SignalR — the client just consumes a standard IAsyncEnumerable.

Client Implementation with Avalonia

For the client, we’re using Avalonia.

Our goals:

  • Share a single SignalR connection across the entire app.
  • Dynamically create and dispose of the connection as needed.
  • Keep track of usage to know when to dispose — basically a reference counting system.

A good solution is to use RefCountLazy<T> from R3Async, which lets us lazily create the connection and automatically dispose it when no one is using it anymore.

public sealed class SignalRHubClient
{
    readonly RefCountLazy<HubConnection> _sharedConnection;

    public SignalRHubClient()
    {
       _sharedConnection = new(async token =>
        {
            var connection = new HubConnectionBuilder()
                             .WithUrl("http://localhost:5062/hub")
                             .Build();

            await connection.StartAsync(token);
            _isConnected.Value = true;
            return new()
            {
                Value = connection,
                Disposable = AsyncDisposable.Create(async () =>
                {
                    await connection.DisposeAsync();
                    _isConnected.Value = false; 
                })
            };
        });
    }

    readonly Signal<bool> _isConnected = new(false);
    public ISignal<bool> IsConnected => _isConnected;

    public async IAsyncEnumerable<ChatMessage> GetChatMessages(ChatRoomId roomId, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await using var hubRef = await _sharedConnection.GetAsync(cancellationToken);
        var connection = hubRef.Value;
        var stream = connection.StreamAsync<ChatMessage>("GetChatMessages", roomId, cancellationToken);
        await foreach (var message in stream)
        {
            yield return message;
        }
    }

    public async ValueTask JoinRoom(ChatRoomId roomId, string user, IAsyncEnumerable<ChatMessage> messages, CancellationToken cancellationToken = default)
    {
        await using var hubRef = await _sharedConnection.GetAsync(cancellationToken);
        await hubRef.Value.InvokeAsync("JoinRoom", roomId, user, messages, cancellationToken: cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

Basically, RefCountLazy<T> is a thread-safe helper that tracks references to our Hub connection.

  • GetAsync() returns a "disposable reference" to the connection.
  • Calling DisposeAsync() on that reference decrements the reference count.
  • When the reference count reaches 0, the connection is automatically disposed.
  • When the reference count goes from 0 to 1, the connection is created again — and so on.

Effectively, we are moving the IAsyncEnumerable from the server to the client!

ViewModel Implementation

Here we use R3 and SignalsDotnet to automatically subscribe and unsubscribe to IAsyncEnumerable streams based on property changes.

public class ChatViewModel
{
    readonly SignalRHubClient _hubClient = new();

    public ChatViewModel(IReadOnlySignal<string> userName)
    {
        var computedFactory = ComputedSignalFactory.Default
                                                   .DisconnectEverythingWhen(Disconnected.Values);

        // Command to send messages
        SendCommand = computedFactory
            .ComputedObservable(
                () => _hubClient.IsConnected.Value && !string.IsNullOrWhiteSpace(DraftMessage.Value), 
                () => new(false))
            .ToReactiveCommand(
                _ => ToSendMessage.OnNext(new(userName.Value, DraftMessage.Value))
            );

        // Subscribe to incoming chat messages
        computedFactory.AsyncEffect(async token =>
        {
            await foreach (var message in _hubClient.GetChatMessages(new(RoomName.Value), token))
            {
                AllChatMessages.Add(message);
            }
        }, ConcurrentChangeStrategy.CancelCurrent);

        // Join the chat room and broadcast outgoing messages
        computedFactory.AsyncEffect(async token =>
        {
            await _hubClient.JoinRoom(
                new(RoomName.Value),
                userName.Value,
                ToSendMessage.TakeUntil(token).ToAsyncEnumerable(),
                token
            );
        }, ConcurrentChangeStrategy.CancelCurrent);

        // Clear draft message after sending
        ToSendMessage.Subscribe(_ => DraftMessage.Value = "");
    }

    public Signal<bool> Disconnected { get; } = new(true);
    public Signal<string> DraftMessage { get; } = new();
    public Subject<ChatMessage> ToSendMessage { get; } = new();
    public Signal<string> RoomName { get; } = new("MyRoom");
    public ObservableCollection<ChatMessage> AllChatMessages { get; } = new();
    public ICommand SendCommand { get; }
}
Enter fullscreen mode Exit fullscreen mode

See what’s happening here? We’re basically just enumerating an IAsyncEnumerable and adding items to a list — it really doesn’t get simpler than that.

SignalsDotnet takes care of all the dependency tracking for us:

  • Whenever RoomName.Value changes, the CancelCurrent strategy cancels the original stream.
  • This cancellation is automatically propagated to the server.
  • A new stream is then enumerated for the updated room — all automatically.
  • Everything is disposed automatically if the view is unloaded

Ok, but how can we implement the backend?

Let’s go step by step.

First assumption: We have a single backend service — every client connects directly to this server.

Later, we can scale out by implementing a Redis backplane (we won’t use the built-in SignalR backplane for this, but we implement it from scratch) to synchronize messages across multiple server instances.

Understand What We Need

So, what do we actually need?

We need a way to publish messages.

Don’t worry about the underlying technology yet — focus only on the method signature and how the API should look.

So we need something like this:

public interface IPublisher<T>
{
    IAsyncDisposable SubscribeAsync(Func<T, CancellationToken, ValueTask> onNextAsync);
    ValueTask PublishAsync(T value, CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode

Here’s the fun part: stop trying to implement this interface yourself.

This problem is already solved and has a well-known name in the ReactiveX world — it’s called a Subject.

All we need is the async version of it, and we can use R3Async to get exactly that.
So, what we really need is an ISubject.

But is this enough? Not quite.

  • We need a subject per topic, so it’s more like:
  ConcurrentDictionary<string, ISubject<T>> _publishersByTopic;
Enter fullscreen mode Exit fullscreen mode
  • Are we good now? Still not. We want the publisher to exist only if someone is subscribed and to dispose automatically when no one needs it. We don't want the dictionary to have zombie-references

That means we need reference counting again, but this time based on a key (the topic). We can use RefCountTable for this! It's basically a concurrent dictionary with only GetOrAddAsyncmethod, that counts how many active references there are.
That means we need a

    readonly RefCountTable<ChatRoomId, ISubject<ChatMessage>> _roomPublisherByRoomId;
Enter fullscreen mode Exit fullscreen mode

Maybe we can hide it behind an interface like this:

public interface IChatService
{
    ValueTask<IAsyncDisposableReference<ISubject<ChatMessage>>> GetOrCreateChatRoom(ChatRoomId id, CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode

Here, IAsyncDisposableReference is essentially the ref-counted reference.
Calling DisposeAsync() on it signals that we no longer need the publisher, and when the ref count reaches 0, it will automatically dispose the underlying implementation that exposes the subject.

Completing the hub

We are now ready to complete the Hub implementation:

public class AppHub(IChatService chatService) : Hub
{
    public async IAsyncEnumerable<ChatMessage> GetChatMessages(ChatRoomId roomId, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await using var chatRef = await chatService.GetOrCreateChatRoom(roomId, cancellationToken);
        var chatMessages = chatRef.Value;
        await foreach (var message in chatMessages.Values
                                          .ToAsyncEnumerable(static () => Channel.CreateUnbounded<ChatMessage>())
                                          .WithCancellation(cancellationToken))
        {
            yield return message;
        }
    }

    public async Task JoinRoom(ChatRoomId roomId, string user, IAsyncEnumerable<ChatMessage> messages)
    {
        var cancellationToken = Context.ConnectionAborted;
        await using var chatRef = await chatService.GetOrCreateChatRoom(roomId, cancellationToken);
        var chatMessages = chatRef.Value;
        await chatMessages.OnNextAsync(new ChatMessage(user, $"{user} joined the room"), cancellationToken);
        try
        {
            await foreach (var message in messages.WithCancellation(cancellationToken))
            {
                await chatMessages.OnNextAsync(message, cancellationToken);
            }
        }
        finally
        {
            await chatMessages.OnNextAsync(new ChatMessage(user, $"{user} left the room"), cancellationToken);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

How it works

  • JoinRoom will return only when the user has left the room.
  • Every message from the client is published to the subject.
  • GetChatMessages enumerates every notification from that subject and yields it to the client in real time.
  • References from RefCountTable are easy to manage using await using statements. They automatically track the lifetime of both publishers and consumers.
  • Note that we can move all this method outside the hub class, since we are not depending from SignalR anywhere. That means that we can easily unit test the logic also

Ok, but how is IChatService implemented?

Here’s the first implementation: InMemory

public class InMemoryChatService : IChatService
{
    readonly RefCountTable<ChatRoomId, ISubject<ChatMessage>> _chatByRoom;

    public InMemoryChatService()
    {
        _chatByRoom = new(async (roomId, cancellationToken) =>
        {
            var chat = Subject.Create<ChatMessage>(new SubjectCreationOptions
            {
                IsStateless = false,
                PublishingOption = PublishingOption.Concurrent
            });

            return new()
            {
                Value = chat,
                Disposable = AsyncDisposable.Create(() => chat.OnCompletedAsync(Result.Success))
            };
        });
    }

    public ValueTask<IAsyncDisposableReference<ISubject<ChatMessage>>> GetOrCreateChatRoom(ChatRoomId id, CancellationToken cancellationToken)
    {
        return _chatByRoom.GetOrCreateAsync(id, cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

Ok, this is good and all, but how do you scale it out?

Just give another implementation of the subject (publisher)!
Let's use redis as backplane

sealed class RedisSubscriberSubject<T> : ISubject<T> where T : class
{
    readonly ISubscriber _subscriber;
    readonly RedisChannel _channel;

    public RedisSubscriberSubject(ISubscriber subscriber, 
                                  RedisChannel channel)
    {
        _subscriber = subscriber;
        _channel = channel;

        Values = AsyncObservable.Create<T>((observer, _) =>
        {
            var messageQueue = subscriber.Subscribe(_channel);
            messageQueue.OnMessage(async message =>
            {
                var notification = JsonSerializer.Deserialize<Notification>((string)message.Message!)!;
                await notification.ForwardTo(observer, CancellationToken.None);
            });

            var disposable = AsyncDisposable.Create(async () => await messageQueue.UnsubscribeAsync());
            return new(disposable);
        });
    }

    public AsyncObservable<T> Values { get; }

    public ValueTask OnNextAsync(T value, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnNext(value));
    public ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnErrorResume(error));
    public ValueTask OnCompletedAsync(Result result) => ForwardNotification(Notification.FromOnCompleted(result));
    async ValueTask ForwardNotification(Notification notification)
    {
        var json = JsonSerializer.Serialize(notification);
        await _subscriber.PublishAsync(_channel, json);
    }

    record Notification
    {
        public static Notification FromOnNext(T value) => new()
        {
            Value = value,
            ErrorMessage = null,
            IsCompleted = false
        };

        public static Notification FromOnErrorResume(Exception exception) => new()
        {
            Value = null,
            ErrorMessage = exception.Message,
            IsCompleted = false
        };

        public static Notification FromOnCompleted(Result result) => new()
        {
            Value = null,
            ErrorMessage = result.Exception?.Message,
            IsCompleted = true
        };

        public string? ErrorMessage { get; init; }
        public T? Value { get; init; }
        public bool IsCompleted { get; init; }

        public ValueTask ForwardTo(AsyncObserver<T> observer, CancellationToken cancellationToken) => (IsCompleted, ErrorMessage) switch
        {
            (false, null) => observer.OnNextAsync(Value!, cancellationToken),
            (false, not null) => observer.OnErrorResumeAsync(new(ErrorMessage), cancellationToken),
            (true, null) => observer.OnCompletedAsync(Result.Success),
            (true, not null) => observer.OnCompletedAsync(Result.Failure(new(ErrorMessage)))
        };
    }
}

public static class RedisSubscriberEx
{
    public static ISubject<T> ToSubject<T>(this ISubscriber subscriber, RedisChannel channel) where T : class => new RedisSubscriberSubject<T>(subscriber, channel);
}
Enter fullscreen mode Exit fullscreen mode

the Important bits are

AsyncObservable.Create<T>((observer, _) =>
        {
            var messageQueue = subscriber.Subscribe(_channel);
            messageQueue.OnMessage(async message =>
            {
                var notification = JsonSerializer.Deserialize<Notification>((string)message.Message!)!;
                await notification.ForwardTo(observer, CancellationToken.None);
            });

            var disposable = AsyncDisposable.Create(async () => await messageQueue.UnsubscribeAsync());
            return new(disposable);
        });
Enter fullscreen mode Exit fullscreen mode

Here we are mapping the redis subscription to an AsyncObservable.

For publishing to Redis:

    public ValueTask OnNextAsync(T value, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnNext(value));
    public ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken) => ForwardNotification(Notification.FromOnErrorResume(error));
    public ValueTask OnCompletedAsync(Result result) => ForwardNotification(Notification.FromOnCompleted(result));
    async ValueTask ForwardNotification(Notification notification)
    {
        var json = JsonSerializer.Serialize(notification);
        await _subscriber.PublishAsync(_channel, json);
    }
Enter fullscreen mode Exit fullscreen mode

Now we can implement our RedisChatService like this:

public class RedisChatService : IChatService
{
    readonly RefCountTable<ChatRoomId, ISubject<ChatMessage>> _table;

    public RedisChatService(IConnectionMultiplexer connectionMultiplexer)
    {
        _table = new(async (roomId, _) =>
        {
            var subscriber = connectionMultiplexer.GetSubscriber();
            var disposable = AsyncDisposable.Create(async () => await subscriber.UnsubscribeAllAsync());
            try
            {
                var subject = subscriber.ToSubject<ChatMessage>(new(roomId.Name, RedisChannel.PatternMode.Auto));
                return new AsyncDisposableValue<ISubject<ChatMessage>>
                {
                    Value = subject,
                    Disposable = disposable
                };
            }
            catch
            {
                await disposable.DisposeAsync();
                throw;
            }
        });
    }

    public ValueTask<IAsyncDisposableReference<ISubject<ChatMessage>>> GetOrCreateChatRoom(ChatRoomId id, CancellationToken cancellationToken)
    {
        return _table.GetOrCreateAsync(id, cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, if we combine the right tools, we can achieve great things with very little effort.

Top comments (0)