DEV Community

Cover image for Event-Driven Architecture Part 2: Event Streaming and Pub/Sub Patterns
Outdated Dev
Outdated Dev

Posted on

Event-Driven Architecture Part 2: Event Streaming and Pub/Sub Patterns

Hello there!👋🧔‍♂️ Welcome back to Part 2 of our Event-Driven Architecture series! In Part 1, we covered the foundational concepts of Message Queues and Topics. Now, we'll dive deeper into Event Streaming, advanced Pub/Sub Patterns, and Best Practices for building robust Event-Driven systems.

Whether you're handling high-throughput event streams, implementing event sourcing, or designing complex event processing workflows, this guide will help you master the advanced aspects of Event-Driven Architecture.

Overview

In this part, we'll explore:

  1. Event Streaming - Continuous flow of events over time with replay capabilities
  2. Pub/Sub Patterns - Advanced patterns for event distribution
  3. Best Practices - Essential patterns and anti-patterns
  4. Real-World Examples - Putting it all together

Let's dive into Event Streaming, one of the most powerful patterns in Event-Driven Architecture.

1. Event Streaming

Event streaming is a continuous flow of events over time, stored in an append-only log. Think of it like a DVR, recording events are recorded as they happen and can be replayed later.

How Event Streaming Works

  1. Producers write events to a stream
  2. Stream stores events in order (append-only log)
  3. Consumers read events from any position in the stream
  4. Replay - Consumers can reprocess events from the beginning

Event Streaming Characteristics

  • Append-only - Events are never modified, only added
  • Ordered - Events maintain their order
  • Replayable - Can reprocess events from any point
  • Durable - Events persist indefinitely
  • Partitioned - Streams can be split into partitions for scalability

Event Streaming Implementation

C# (.NET) - Using Apache Kafka

using Confluent.Kafka;

// Producer
public class OrderEventProducer
{
    private readonly IProducer<string, string> _producer;

    public OrderEventProducer()
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092"
        };
        _producer = new ProducerBuilder<string, string>(config).Build();
    }

    public async Task ProduceOrderCreated(OrderCreatedEvent orderCreated)
    {
        var message = JsonSerializer.Serialize(orderCreated);

        var kafkaMessage = new Message<string, string>
        {
            Key = orderCreated.OrderId.ToString(),
            Value = message
        };

        await _producer.ProduceAsync("order-events", kafkaMessage);
        Console.WriteLine($"Produced order created event: {orderCreated.OrderId}");
    }
}

// Consumer
public class OrderEventConsumer
{
    private readonly IConsumer<string, string> _consumer;

    public OrderEventConsumer()
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "order-processor",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _consumer.Subscribe("order-events");
    }

    public void StartConsuming()
    {
        try
        {
            while (true)
            {
                var result = _consumer.Consume(TimeSpan.FromSeconds(1));

                if (result != null)
                {
                    var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(result.Message.Value);
                    ProcessOrder(orderCreated);

                    // Commit offset
                    _consumer.Commit(result);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error consuming: {ex.Message}");
        }
    }

    private void ProcessOrder(OrderCreatedEvent orderCreated)
    {
        Console.WriteLine($"Processing order: {orderCreated.OrderId}");
    }
}
Enter fullscreen mode Exit fullscreen mode

Python - Using Apache Kafka

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
class OrderEventProducer:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def produce_order_created(self, order_created):
        self.producer.send('order-events', value=order_created)
        self.producer.flush()
        print(f"Produced order created event: {order_created['order_id']}")

# Consumer
class OrderEventConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'order-events',
            bootstrap_servers=['localhost:9092'],
            group_id='order-processor',
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest'
        )

    def start_consuming(self):
        for message in self.consumer:
            order_created = message.value
            self.process_order(order_created)

    def process_order(self, order_created):
        print(f"Processing order: {order_created['order_id']}")
Enter fullscreen mode Exit fullscreen mode

TypeScript (Node.js) - Using Apache Kafka

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'order-service',
    brokers: ['localhost:9092']
});

// Producer
class OrderEventProducer {
    private producer = kafka.producer();

    async connect() {
        await this.producer.connect();
    }

    async produceOrderCreated(orderCreated: OrderCreatedEvent) {
        await this.producer.send({
            topic: 'order-events',
            messages: [{
                key: orderCreated.orderId.toString(),
                value: JSON.stringify(orderCreated)
            }]
        });
        console.log(`Produced order created event: ${orderCreated.orderId}`);
    }
}

// Consumer
class OrderEventConsumer {
    private consumer = kafka.consumer({ groupId: 'order-processor' });

    async connect() {
        await this.consumer.connect();
        await this.consumer.subscribe({ topic: 'order-events', fromBeginning: true });
    }

    async startConsuming() {
        await this.consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                const orderCreated: OrderCreatedEvent = JSON.parse(message.value?.toString() || '{}');
                await this.processOrder(orderCreated);
            }
        });
    }

    private async processOrder(orderCreated: OrderCreatedEvent) {
        console.log(`Processing order: ${orderCreated.orderId}`);
    }
}
Enter fullscreen mode Exit fullscreen mode

Event Streaming Best Practices

Use consumer groups - Enable parallel processing and load balancing
Handle offsets - Track processing progress
Partition wisely - Distribute load across partitions
Idempotent producers - Prevent duplicate events
Schema registry - Validate event schemas

Don't lose offsets - Always commit offsets after processing
Don't ignore ordering - Understand partition ordering guarantees
Don't process synchronously - Use async processing for better throughput

2. Pub/Sub Patterns

Publish-Subscribe (Pub/Sub) patterns define how events are distributed to subscribers. Let's explore common patterns:

2.1 Fan-Out Pattern

One publisher, many subscribers - All subscribers receive every message.

Publisher → Topic → [Subscriber1, Subscriber2, Subscriber3]
Enter fullscreen mode Exit fullscreen mode

Use Case: Broadcasting notifications to multiple services

Example: When an order is created, notify Email Service, Inventory Service, Analytics Service, and Payment Service simultaneously.

2.2 Topic-Based Pattern

Messages filtered by topic - Subscribers subscribe to specific topics.

Publisher → Topic: "orders" → [OrderService1, OrderService2]
Publisher → Topic: "payments" → [PaymentService1]
Enter fullscreen mode Exit fullscreen mode

Use Case: Organizing events by domain or type

Example: Separate topics for "orders", "payments", "inventory" allow services to subscribe only to relevant events.

2.3 Content-Based Pattern

Messages filtered by content - Subscribers filter based on message content.

Publisher → Topic → [Subscriber1 (filters: amount > 100), Subscriber2 (filters: status = "pending")]
Enter fullscreen mode Exit fullscreen mode

Use Case: Conditional processing based on event data

Example: High-value order processor only processes orders over $1000, while urgent order processor handles orders marked as "urgent".

2.4 Event Sourcing Pattern

Store all events - Maintain complete event history for replay and audit.

// Event Store
public class EventStore
{
    private readonly List<Event> _events = new List<Event>();

    public void Append(Event evt)
    {
        _events.Add(evt);
        // Also publish to event stream
        PublishToStream(evt);
    }

    public IEnumerable<Event> GetEvents(string aggregateId)
    {
        return _events.Where(e => e.AggregateId == aggregateId);
    }

    public T RebuildAggregate<T>(string aggregateId) where T : AggregateRoot, new()
    {
        var events = GetEvents(aggregateId);
        var aggregate = new T();

        foreach (var evt in events)
        {
            aggregate.Apply(evt);
        }

        return aggregate;
    }
}

// Usage
public class OrderAggregate : AggregateRoot
{
    public Guid OrderId { get; private set; }
    public decimal Total { get; private set; }
    public OrderStatus Status { get; private set; }

    public void CreateOrder(CreateOrderCommand command)
    {
        Apply(new OrderCreatedEvent
        {
            OrderId = command.OrderId,
            CustomerId = command.CustomerId,
            Total = command.Total
        });
    }

    public void Apply(OrderCreatedEvent evt)
    {
        OrderId = evt.OrderId;
        Total = evt.Total;
        Status = OrderStatus.Created;
    }
}
Enter fullscreen mode Exit fullscreen mode

Use Case: Audit trails, time travel debugging, rebuilding state

Benefits:

  • Complete audit trail
  • Ability to replay events
  • Time-travel debugging
  • Rebuild state from events

Choosing the Right Pattern

Pattern Use Case Example
Message Queue Task distribution, one consumer per message Order processing queue
Topics Broadcasting to multiple services Order created → Email, Inventory, Analytics
Event Streaming Event sourcing, replay, high throughput User activity stream, audit log
Fan-Out Broadcasting to all subscribers System-wide notifications
Topic-Based Organizing by domain Orders, Payments, Inventory topics
Content-Based Conditional processing High-value orders, urgent requests
Event Sourcing Complete event history Audit trails, state rebuilding

Best Practices for Event-Driven Architecture

General Best Practices

Design for failure - Services should handle missing or duplicate events
Idempotency - Make operations safe to retry
Event versioning - Version your events for backward compatibility
Schema evolution - Plan for schema changes
Monitoring - Track event throughput, latency, and errors
Dead letter queues - Handle failed messages gracefully
Event ordering - Understand ordering guarantees
Backpressure - Handle slow consumers gracefully

Implementing Idempotency

public class OrderService
{
    private readonly IEventStore _eventStore;

    public async Task ProcessOrderCreated(OrderCreatedEvent evt)
    {
        // Check if already processed
        if (await _eventStore.HasEventBeenProcessed(evt.EventId))
        {
            return; // Already processed, skip
        }

        // Process order
        await CreateOrder(evt);

        // Mark as processed
        await _eventStore.MarkEventAsProcessed(evt.EventId);
    }
}
Enter fullscreen mode Exit fullscreen mode

Event Versioning

public class OrderCreatedEventV1
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal Total { get; set; }
}

public class OrderCreatedEventV2
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal Total { get; set; }
    public string Currency { get; set; } // New field
    public DateTime CreatedAt { get; set; } // New field
}

// Consumer handles both versions
public class OrderProcessor
{
    public void ProcessOrderCreated(object evt)
    {
        if (evt is OrderCreatedEventV2 v2)
        {
            ProcessV2(v2);
        }
        else if (evt is OrderCreatedEventV1 v1)
        {
            ProcessV1(v1);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Anti-Patterns to Avoid

Tight coupling - Don't make services depend on specific implementations
Synchronous calls in async systems - Don't block on event processing
Ignoring failures - Always handle and retry failures
No idempotency - Don't assume events are processed exactly once
Ignoring ordering - Understand when ordering matters
No monitoring - Monitor event processing health

Real-World Example: E-Commerce System

Let's see how Event-Driven Architecture works in an e-commerce system:

// Order Service (Producer)
public class OrderService
{
    private readonly IEventPublisher _eventPublisher;

    public async Task<Order> CreateOrder(CreateOrderRequest request)
    {
        var order = new Order
        {
            Id = Guid.NewGuid(),
            CustomerId = request.CustomerId,
            Items = request.Items,
            Total = CalculateTotal(request.Items)
        };

        // Save order
        await _orderRepository.Save(order);

        // Publish event
        await _eventPublisher.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.Total,
            Items = order.Items
        });

        return order;
    }
}

// Email Service (Consumer)
public class EmailService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        await SendOrderConfirmationEmail(evt.CustomerId, evt.OrderId);
    }
}

// Inventory Service (Consumer)
public class InventoryService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        foreach (var item in evt.Items)
        {
            await DecreaseStock(item.ProductId, item.Quantity);
        }
    }
}

// Payment Service (Consumer)
public class PaymentService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        await ProcessPayment(evt.OrderId, evt.TotalAmount);
    }
}

// Analytics Service (Consumer)
public class AnalyticsService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        await TrackOrderCreated(evt);
    }
}
Enter fullscreen mode Exit fullscreen mode

System Flow

  1. Order Service creates an order and publishes OrderCreatedEvent
  2. Email Service receives event → sends confirmation email
  3. Inventory Service receives event → decreases stock
  4. Payment Service receives event → processes payment
  5. Analytics Service receives event → tracks metrics

All services process independently and asynchronously!

Conclusion

Event-Driven Architecture is a powerful pattern for building scalable, resilient, and loosely coupled systems. By understanding Event Streaming, Pub/Sub Patterns, and Best Practices, you can design systems that handle high volumes of events and scale seamlessly.

Key Takeaways:

  1. Event Streaming - Use for high-throughput, replayable event processing
  2. Pub/Sub Patterns - Choose the right pattern for your use case
  3. Event Sourcing - Maintain complete event history for audit and replay
  4. Design for failure - Always handle failures gracefully
  5. Idempotency - Make operations safe to retry
  6. Event versioning - Plan for schema evolution
  7. Monitor everything - Track event processing health

Remember: Event-Driven Architecture isn't a silver bullet, it adds complexity but provides significant benefits for the right use cases. Use it when you need loose coupling, high scalability, and real-time processing.

Start small, learn from your mistakes, and gradually adopt Event-Driven Architecture where it makes sense. Your systems will thank you!

Stay eventful, and happy coding! 🚀📡

Top comments (0)