IPC Event Aggregator in C# (Part-2)

IPC Event Aggregator in C# (Part-2)

In the previous article Simple Event Aggregator in C# we covered creating a basic “in-memory” mechanism for transferring events within a single process and memory space.

Now, let’s extend it to be able to communicate between separate executables (different processes). For that we’ll need to create what known as an Inter-Process Communication (IPC) mechanism. The challenge is that separate executables are isolated from one another by the operating system.

Common IPC Solutions

A dedicated event aggregator across executables would essentially be building one of the following IPC mechanisms under the hood. You can use existing libraries that abstract this communication for you, such as those leveraging named pipes or network communication, to simulate an “aggregator” experience.

  • Named Pipes:
    This is a reliable and relatively simple method for communication between two related processes on the same machine. One process acts as the server, and the other as the client.
  • Memory-Mapped Files:
    This method allows multiple processes to read from and write to a shared block of memory. You would still need an additional signaling mechanism (like a Semaphore or EventWaitHandle) to notify other processes when data is updated.
  • TCP/IP Sockets or Web Services (WCF/gRPC):
    For more complex scenarios, network communication is a robust option. One application can host a service (e.g., a gRPC service or a simple HTTP API) and the other can make calls to it.
  • Message Queues:
    Technologies like RabbitMQ or Azure Service Bus are designed for reliable, decoupled communication, especially useful for long-running processes or systems that need guaranteed delivery across machines or even networks. 

Alternative Approaches

Shared Database or File

A Shared Database/File can also be used. Though not always recommended, it can be used in depending on the project’s specific conditions.

We’ve used automated file queues in the past by for hospital Fax Machine queues. Doctors or nurses would send a fax for a prescription to be picked up by the internal pharmacy. The incoming image is stored as an image (TIFF, PNG, etc.) in a folder to be scrapped by an background [Windows] Service Which would then archive the prescription and perform object-character recognition (OCR) to create an order for the patient.

This was nice until egregious IT departments or anti-virus software blocked folder read/writes and queues would get backed up. Don’t do that.

Extending the Event Aggregator

Let’s extend our weak-reference event aggregator from the previous post to enable IPC support.

public interface IEventAggregator
{
  void Subscribe<TEvent>(Action<TEvent> handler);
  void Unsubscribe<TEvent>(Action<TEvent> handler);
  void Publish<TEvent>(TEvent eventData);

  // New: IPC Support
  void EnableIpc(IEventTransport transport);
}

Next, add a Transport Abstraction

public interface IEventTransport
{
  void Send<TEvent>(TEvent eventData);
  void StartListening<TEvent>(Action<TEvent> onEventReceived);
}

Update EventAggregator class to include IEventTransport for if IPC is enabled.

public class EventAggregator : IEventAggregator
{
  private readonly ConcurrentDictionary<Type, List<WeakReference>> _subscribers = new();
  private IEventTransport? _transport;

  public void EnableIpc(IEventTransport transport)
  {
    _transport = transport;
  }

  public void Subscribe<TEvent>(Action<TEvent> handler)
  {
    var eventType = typeof(TEvent);
    var weakHandler = new WeakReference(handler);

    _subscribers.AddOrUpdate(eventType,
      _ => new List<WeakReference> { weakHandler },
      (_, handlers) =>
      {
        handlers.Add(weakHandler);
        return handlers;
      });

    // If IPC is enabled, start listening for remote events
    _transport?.StartListening<TEvent>(handler);
  }

  public void Unsubscribe<TEvent>(Action<TEvent> handler)
  {
    // ... See previous post for implementation ...
  }

  public void Publish<TEvent>(TEvent eventData)
  {
    var eventType = typeof(TEvent);
    if (_subscribers.TryGetValue(eventType, out var handlers))
    {
      var deadRefs = new List<WeakReference>();

      foreach (var weakRef in handlers)
      {
        if (weakRef.Target is Action<TEvent> handler)
          handler(eventData);
        else
          deadRefs.Add(weakRef);
      }

      foreach (var dead in deadRefs)
        handlers.Remove(dead);
    }

    // Send to IPC transport if enabled
    _transport?.Send(eventData);
  }
}

Example IPC Transport

Dependency Injection Registration

var aggregator = app.Services.GetRequiredService<IEventAggregator>();
var transport = app.Services.GetRequiredService<IEventTransport>();

Named Pipes Transport

public class NamedPipeTransport : IEventTransport
{
  private readonly string _pipeName;

  public NamedPipeTransport(string pipeName)
  {
    _pipeName = pipeName;
  }

  public void Send<TEvent>(TEvent eventData)
  {
    // Serialize and write to named pipe
  }

  public void StartListening<TEvent>(Action<TEvent> onEventReceived)
  {
    // Read from named pipe and deserialize
  }
}

Memory-Mapped File Transport

public class MemoryMappedTransport : IEventTransport
{
  public void Send<TEvent>(TEvent eventData)
  {
    // Serialize and write to memory-mapped file
  }

  public void StartListening<TEvent>(Action<TEvent> onEventReceived)
  {
    // Poll memory-mapped file for changes
  }
}

TCP/IP Transport

public class TcpTransport : IEventTransport
{
  private readonly string _host;
  private readonly int _port;

  public TcpTransport(string host, int port)
  {
    _host = host;
    _port = port;
  }

  public void Send<TEvent>(TEvent eventData)
  {
    // Serialize and send over TCP socket
  }

  public void StartListening<TEvent>(Action<TEvent> onEventReceived)
  {
    // Listen on TCP socket and deserialize
  }
}

Full Implementation

Now that you’ve gotten a high-level of how to extend our Event Aggregator with optional IPC transports, let’s showcase a full implementation with the IEventTransport interface. Each example uses JSON serialization for simplicity.

Common JSON Serialization

using System.Text.Json;

public static class EventSerializer
{
  public static string Serialize<T>(T obj) => JsonSerializer.Serialize(obj);
  public static T Deserialize<T>(string json) => JsonSerializer.Deserialize<T>(json)!;
}

Named Pipes Transport

This requires the use of System.IO.Pipes for IPC between processes.

using System.IO.Pipes;
using System.Text;
using System.Threading.Tasks;

public class NamedPipeTransport : IEventTransport
{
  private readonly string _pipeName;

  public NamedPipeTransport(string pipeName)
  {
    _pipeName = pipeName;
  }

  public void Send<TEvent>(TEvent eventData)
  {
    var json = EventSerializer.Serialize(eventData);
    using var client = new NamedPipeClientStream(".", _pipeName, PipeDirection.Out);
    client.Connect();

    var bytes = Encoding.UTF8.GetBytes(json);
    client.Write(bytes, 0, bytes.Length);
  }

  public void StartListening<TEvent>(Action<TEvent> onEventReceived)
  {
    Task.Run(() =>
    {
      using var server = new NamedPipeServerStream(_pipeName, PipeDirection.In);
      server.WaitForConnection();
      using var reader = new StreamReader(server);
      var json = reader.ReadToEnd();

      var evt = EventSerializer.Deserialize<TEvent>(json);
      onEventReceived(evt);
    });
  }
}

Memory-Mapped File Transport

Uses System.IO.MemoryMappedFiles for shared memory IPC.

using System.IO.MemoryMappedFiles;
using System.Text;

public class MemoryMappedTransport : IEventTransport
{
  private readonly string _mapName;
  private const int BufferSize = 4096;

  public MemoryMappedTransport(string mapName)
  {
    _mapName = mapName;
  }

  public void Send<TEvent>(TEvent eventData)
  {
    var json = EventSerializer.Serialize(eventData);
    using var mmf = MemoryMappedFile.CreateOrOpen(_mapName, BufferSize);
    using var accessor = mmf.CreateViewAccessor();

    var bytes = Encoding.UTF8.GetBytes(json);
    accessor.WriteArray(0, bytes, 0, bytes.Length);
  }

  public void StartListening<TEvent>(Action<TEvent> onEventReceived)
  {
    Task.Run(() =>
    {
      using var mmf = MemoryMappedFile.OpenExisting(_mapName);
      using var accessor = mmf.CreateViewAccessor();

      var bytes = new byte[BufferSize];
      accessor.ReadArray(0, bytes, 0, bytes.Length);

      var json = Encoding.UTF8.GetString(bytes).TrimEnd('\0');
      var evt = EventSerializer.Deserialize<TEvent>(json);
      onEventReceived(evt);
    });
  }
}

TCP/IP Transport

Uses System.Net.Sockets for network communication.

using System.Net;
using System.Net.Sockets;
using System.Text;

public class TcpTransport : IEventTransport
{
  private readonly string _host;
  private readonly int _port;

  public TcpTransport(string host, int port)
  {
    _host = host;
    _port = port;
  }

  public void Send<TEvent>(TEvent eventData)
  {
    var json = EventSerializer.Serialize(eventData);
    using var client = new TcpClient(_host, _port);
    var stream = client.GetStream();

    var bytes = Encoding.UTF8.GetBytes(json);
    stream.Write(bytes, 0, bytes.Length);
  }

  public void StartListening<TEvent>(Action<TEvent> onEventReceived)
  {
    Task.Run(() =>
    {
      var listener = new TcpListener(IPAddress.Any, _port);
      listener.Start();
      while (true)
      {
        using var client = listener.AcceptTcpClient();
        using var stream = client.GetStream();

        var buffer = new byte[4096];
        var bytesRead = stream.Read(buffer, 0, buffer.Length);

        var json = Encoding.UTF8.GetString(buffer, 0, bytesRead);
        var evt = EventSerializer.Deserialize<TEvent>(json);
        onEventReceived(evt);
      }
    });
  }
}

Example Usage with EventAggregator

// Example Event
public class UserCreatedEvent
{
  public string Username { get; set; }
}

// Sample EventAggregator for each use-case
var aggregator = new EventAggregator();

// Enable Named Pipe IPC
aggregator.EnableIpc(new NamedPipeTransport("MyPipe"));

// Enable Memory-Mapped IPC
// aggregator.EnableIpc(new MemoryMappedTransport("MySharedMemory"));

// Enable TCP/IP IPC
// aggregator.EnableIpc(new TcpTransport("127.0.0.1", 5000));