Async IPC Event Aggregator (Pt-3)
Let’s evolve our IPC Event Aggregator to include asynchronous calls and bidirectional request/response into our pluggable IPC transports. This new design preserves the weak-reference handlers, remains DI-friendly, and keeps publishers decoupled from subscribers and transport mechanics.
Note: This article is for educational purposes only.
To recap on the previous blog posts, we’ve first created a Simple Event Aggregator in C# which used in-memory event communications for a single application. Followed by extending it in IPC Event Aggregator in C# (Part-2) to allow for inter-process communication between multiple applications. Also, Part-2 opened the door to extend the IPC transports to be pluggable with Name Pipes, Memory-Mapped Files, and TCP/IP Sockets.
Why Extend To Be Async?
Real World Case Study
As a real-world example, one of my previous medical robotic systems transported a very large IPC package at startup every time. This payload was over 24 MB in size! Though it was large, it was able to transmit and deserialize, and aggregate it very quickly (few seconds) thanks to using TCP/IP IPCs, written in C++, and finely tuned by seasoned engineers. Yes, C++ was a HUGE help in data crunching of the XML payload. Future proof-of-concepts used JavaScript and Python it puked on itself.
Why so large you may ask? The payload contained nearly everything about the system, including the state machine transitions, coordinates of the of all of the medications (X, Y, Z, X2, Y2, Z2, Rail), system settings, and so much more. The smallest system could hold over 99,840 medications and a fully-built system over 3,294,720 medications (that’s correct; I architected the storage system).
Later designs included a more optimized storage of the coordinate system. That’s a retrospective article for another day.
In short, if you sending a large packet you don’t want to lock up your application.
Implementation
Architecture Overview
This example implementation is intentionally straightforward for clarity. For production, add robot error handling, retires, backpressure, queueing, auth/ACLs, and schema versioning. Again, this is for educational purposes only.
EventAggregator (DI singleton):
PublishAsync<TEvent>(TEvent eventData)RequestAsync<TRequest, TResponse>(TRequest request)Subscribe<TEvent>(Action<TEvent> handler)(one-way)SubscribeRequest<TRequest, TResponse>(Func<TRequest, Task<TResponse>> handler)(request/response)
IEventTransport (async, bi-directional):
StartAsync(Func<EventEnvelope, Task> onMessageAsync, CancellationToken ct)SendAsync(EventEnvelope envelope, CancellationToken ct)ReplyAddress { get; }(used by aggregator to populateReplyTofor requests)
Transports:
NamedPipeTransport(duplex via named server/client pipes + length-prefix framing)MemoryMappedTransport(two MMFs + namedEventWaitHandles for request/response signals)TcpTransport(two ports—one for requests, one for responses—with length-prefix framing)
Common Types and Serialization
using System;
using System.Text.Json;
using System.Text.Json.Serialization;
public sealed class EventEnvelope
{
public string MessageId { get; set; } = Guid.NewGuid().ToString("N");
public string CorrelationId { get; set; } = Guid.NewGuid().ToString("N");
public string EventType { get; set; } = default!;
public bool IsRequest { get; set; }
public bool IsResponse { get; set; }
public string? ReplyTo { get; set; } // Transport-specific reply address (pipe name, host:port, etc.)
public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.UtcNow;
public string PayloadJson { get; set; } = default!;
}
// Helpers for JSON payload and envelope
public static class EventSerializer
{
private static readonly JsonSerializerOptions Options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
Converters = { new JsonStringEnumConverter() }
};
public static string Serialize<T>(T obj) => JsonSerializer.Serialize(obj, Options);
public static T Deserialize<T>(string json) => JsonSerializer.Deserialize<T>(json, Options)!;
public static EventEnvelope Wrap<T>(T payload, bool isRequest, string? replyTo, string? correlationId = null)
{
return new EventEnvelope
{
MessageId = Guid.NewGuid().ToString("N"),
CorrelationId = correlationId ?? Guid.NewGuid().ToString("N"),
EventType = typeof(T).AssemblyQualifiedName!,
IsRequest = isRequest,
IsResponse = !isRequest && replyTo is null ? false : false, // set by sender when replying
ReplyTo = replyTo,
PayloadJson = Serialize(payload),
Timestamp = DateTimeOffset.UtcNow
};
}
}
Transport Abstraction
using System.Threading;
using System.Threading.Tasks;
public interface IEventTransport
{
// Start listening for both requests and responses (as applicable)
Task StartAsync(Func<EventEnvelope, Task> onMessageAsync, CancellationToken cancellationToken = default);
// Send a message (request or response). If envelope.IsResponse == true and envelope.ReplyTo != null,
// transport sends to the reply channel; otherwise to its configured request channel.
Task SendAsync(EventEnvelope envelope, CancellationToken cancellationToken = default);
// The reply address other parties should use to send responses back to this process
string ReplyAddress { get; }
}
Async Event Aggregator
The following recreates the Event Aggregator to include async, and request/response, while retaining the weak references.
Do note, the following example DOES NOT use
System.Linqas it is not AOT friendly, slower in some cases, and can be difficult to debug.If you want your application to be Ahead-of-Time friendly, then you got to get rid of Linq.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Text.Json;
using System.Reflection;
public interface IEventAggregator
{
void Subscribe<TEvent>(Action<TEvent> handler);
void Unsubscribe<TEvent>(Action<TEvent> handler);
void SubscribeRequest<TRequest, TResponse>(Func<TRequest, Task<TResponse>> handler);
void UnsubscribeRequest<TRequest, TResponse>(Func<TRequest, Task<TResponse>> handler);
Task PublishAsync<TEvent>(TEvent eventData, CancellationToken cancellationToken = default);
Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default);
// Configure transport
Task UseTransportAsync(IEventTransport transport, CancellationToken cancellationToken = default);
}
public class EventAggregator : IEventAggregator
{
private readonly ConcurrentDictionary<Type, List<WeakReference>> _eventSubscribers = new();
private readonly ConcurrentDictionary<Type, List<WeakReference>> _requestSubscribers = new();
private readonly ConcurrentDictionary<string, TaskCompletionSource<object?>> _pendingRequests = new();
private IEventTransport? _transport;
public async Task UseTransportAsync(IEventTransport transport, CancellationToken cancellationToken = default)
{
_transport = transport;
await _transport.StartAsync(OnTransportMessageAsync, cancellationToken);
}
public void Subscribe<TEvent>(Action<TEvent> handler)
{
var type = typeof(TEvent);
var wr = new WeakReference(handler);
_eventSubscribers.AddOrUpdate(type, _ => new List<WeakReference> { wr },
(_, list) => { list.Add(wr); return list; });
}
public void Unsubscribe<TEvent>(Action<TEvent> handler)
{
var type = typeof(TEvent);
if (_eventSubscribers.TryGetValue(type, out var list))
{
// Using System.Linq:
//// list.RemoveAll(w => w.Target is Action<TEvent> h && h == handler);
// Manual removal without LINQ
for (int i = list.Count - 1; i >= 0; i--)
{
var target = list[i].Target;
if (target is Action<TEvent> existing && existing == handler)
list.RemoveAt(i);
else if (target == null)
list.RemoveAt(i); // cleanup dead refs
}
}
}
public void SubscribeRequest<TRequest, TResponse>(Func<TRequest, Task<TResponse>> handler)
{
var type = typeof(TRequest);
var wr = new WeakReference(handler);
_requestSubscribers.AddOrUpdate(type, _ => new List<WeakReference> { wr },
(_, list) => { list.Add(wr); return list; });
}
public void UnsubscribeRequest<TRequest, TResponse>(Func<TRequest, Task<TResponse>> handler)
{
var type = typeof(TRequest);
if (_requestSubscribers.TryGetValue(type, out var list))
{
// With System.Linq:
//// list.RemoveAll(w => w.Target is Func<TRequest, Task<TResponse>> h && h == handler);
// Manual removal without LINQ
for (int i = list.Count - 1; i >= 0; i--)
{
var target = list[i].Target;
if (target is Func<TRequest, Task<TResponse>> existing && existing == handler)
list.RemoveAt(i);
else if (target == null)
list.RemoveAt(i); // cleanup dead refs
}
}
}
public async Task PublishAsync<TEvent>(TEvent eventData, CancellationToken cancellationToken = default)
{
// Local dispatch
DispatchEventLocal(eventData);
// Remote dispatch
if (_transport != null)
{
var envelope = EventSerializer.Wrap(eventData, isRequest: false, replyTo: null);
await _transport.SendAsync(envelope, cancellationToken);
}
}
public async Task<TResponse> RequestAsync<TRequest, TResponse>(
TRequest request,
CancellationToken cancellationToken = default)
{
var correlationId = Guid.NewGuid().ToString("N");
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
_pendingRequests[correlationId] = tcs;
// Local handlers (if any) – we invoke first; if handled locally, short-circuit (no IPC)
var localHandler = GetFirstRequestHandler<TRequest, TResponse>();
if (localHandler != null)
{
var localResponse = await localHandler(request);
_pendingRequests.TryRemove(correlationId, out _);
return localResponse;
}
if (_transport == null)
throw new InvalidOperationException("No transport configured for request/response.");
var envelope = EventSerializer.Wrap(request, isRequest: true, replyTo: _transport.ReplyAddress, correlationId);
await _transport.SendAsync(envelope, cancellationToken);
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
var obj = await tcs.Task.ConfigureAwait(false);
if (obj is TResponse typed)
return typed;
// Try deserializing if the transport delivered JSON payload as string
if (obj is string s)
{
try { return EventSerializer.Deserialize<TResponse>(s); }
catch { /* fall through */ }
}
throw new InvalidOperationException($"Response type mismatch for correlationId={correlationId}.");
}
}
private void DeliverLocalGeneric<T>(T payload) => DispatchEventLocal(payload);
private void DispatchEventLocal<TEvent>(TEvent eventData)
{
var type = typeof(TEvent);
if (_eventSubscribers.TryGetValue(type, out var subs))
{
////var dead = new List<WeakReference>();
////foreach (var wr in subs)
////{
//// if (wr.Target is Action<TEvent> handler)
//// handler(eventData);
//// else
//// dead.Add(wr);
////}
////
////foreach (var d in dead)
//// subs.Remove(d);
// Manual iteration; remove dead refs
for (int i = subs.Count - 1; i >= 0; i--)
{
var target = subs[i].Target;
if (target is Action<TEvent> handler)
handler(eventData);
else if (target == null)
subs.RemoveAt(i);
}
}
}
private Func<TRequest, Task<TResponse>>? GetFirstRequestHandler<TRequest, TResponse>()
{
var type = typeof(TRequest);
if (_requestSubscribers.TryGetValue(type, out var subs))
{
var dead = new List<WeakReference>();
foreach (var wr in subs)
{
if (wr.Target is Func<TRequest, Task<TResponse>> handler)
return handler;
dead.Add(wr);
}
foreach (var d in dead)
subs.Remove(d);
}
return null;
}
// Incoming messages from transport
private async Task OnTransportMessageAsync(EventEnvelope envelope)
{
// Resolve type without LINQ
var eventType = Type.GetType(envelope.EventType, throwOnError: false);
if (eventType == null)
return;
if (envelope.IsResponse)
{
// Complete pending request
if (_pendingRequests.TryRemove(envelope.CorrelationId, out var tcs))
tcs.TrySetResult(envelope.PayloadJson);
return;
}
// Deserialize dynamically
var payloadObj = JsonSerializer.Deserialize(envelope.PayloadJson, eventType);
if (payloadObj == null)
return;
if (envelope.IsRequest)
{
// Try to find a matching request handler (manual scan)
if (_requestSubscribers.TryGetValue(eventType, out var subs) && subs.Count > 0)
{
object? responseObj = null;
bool hasHandler = false;
// Iterate backward to safely remove dead refs during iteration
for (int i = subs.Count - 1; i >= 0; i--)
{
var target = subs[i].Target;
if (target == null)
{
subs.RemoveAt(i);
continue;
}
// Using reflection to invoke the generic delegate without LINQ/ofType
var delType = target.GetType();
// Expected: Func<TRequest, Task<TResponse>>
var invoke = delType.GetMethod("Invoke");
if (invoke != null && invoke.GetParameters().Length == 1)
{
hasHandler = true;
try
{
var taskObj = invoke.Invoke(target, new[] { payloadObj });
if (taskObj is Task t)
{
await t.ConfigureAwait(false);
// Extract Task<TResponse>.Result via reflection
var taskType = t.GetType();
var resultProp = taskType.GetProperty("Result", BindingFlags.Public | BindingFlags.Instance);
if (resultProp != null)
responseObj = resultProp.GetValue(t);
}
}
catch
{
// For demo: ignore handler exception
}
break; // Use the first handler found
}
}
if (hasHandler && responseObj != null && _transport != null && envelope.ReplyTo != null)
{
var responseEnvelope = new EventEnvelope
{
MessageId = Guid.NewGuid().ToString("N"),
CorrelationId = envelope.CorrelationId,
EventType = responseObj.GetType().AssemblyQualifiedName!,
IsRequest = false,
IsResponse = true,
ReplyTo = envelope.ReplyTo,
Timestamp = DateTimeOffset.UtcNow,
PayloadJson = EventSerializer.Serialize(responseObj)
};
await _transport.SendAsync(responseEnvelope).ConfigureAwait(false);
}
}
// else: no handler; drop or log
return;
}
// One-way publish – deliver locally with closed generic
var deliverMethod = typeof(EventAggregator).GetMethod(nameof(DeliverLocalGeneric),
BindingFlags.NonPublic | BindingFlags.Instance);
var closed = deliverMethod!.MakeGenericMethod(eventType);
closed.Invoke(this, new[] { payloadObj });
}
/*
//// The following relies on System.Linq. Kept this example here for comparison
// Incoming messages from transport
private async Task OnTransportMessageAsync(EventEnvelope envelope)
{
// Resolve type
var eventType = Type.GetType(envelope.EventType, throwOnError: false);
if (eventType == null)
return;
if (envelope.IsResponse)
{
// Complete pending request
// Aggregator caller will deserialize to expected TResponse
if (_pendingRequests.TryRemove(envelope.CorrelationId, out var tcs))
tcs.TrySetResult(envelope.PayloadJson);
return;
}
// Request or Publish
var payloadObj = JsonSerializer.Deserialize(envelope.PayloadJson, eventType)!;
if (payloadObj is null)
return;
if (envelope.IsRequest)
{
// Find request handler and send response
var handlerList = _requestSubscribers.TryGetValue(eventType, out var subs) ? subs : null;
var handler = handlerList?.Select(w => w.Target).OfType<dynamic>().FirstOrDefault();
if (handler is null)
return; // no handler – drop or log
object response;
try
{
// Invoke dynamically
response = await handler((dynamic)payloadObj);
}
catch
{
// For brevity, ignoring error propagation
return;
}
if (_transport != null && envelope.ReplyTo != null)
{
var responseEnvelope = new EventEnvelope
{
MessageId = Guid.NewGuid().ToString("N"),
CorrelationId = envelope.CorrelationId,
EventType = response.GetType().AssemblyQualifiedName!,
IsRequest = false,
IsResponse = true,
ReplyTo = envelope.ReplyTo, // used by transport to route back to sender
Timestamp = DateTimeOffset.UtcNow,
PayloadJson = EventSerializer.Serialize(response),
};
await _transport.SendAsync(responseEnvelope);
}
return;
}
// One-way publish – deliver locally
// Use closed generic method
var deliverMethod = typeof(EventAggregator).GetMethod(nameof(DeliverLocalGeneric), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
var closed = deliverMethod!.MakeGenericMethod(eventType);
closed.Invoke(this, new[] { payloadObj });
}
*/
}
Named Pipes Transport (Async + Bi-Directional)
The named pipes patter includes the following. Framing 4-byte length prefix (little-endian), then JSON.
- Process A
- Listens on
incomingPipeNamefor requests - Sends response to
replyPipeNameadvertised by Process B.
- Listens on
- Process B
- Does the inverse of Process A.
using System;
using System.IO;
using System.IO.Pipes;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class NamedPipeTransport : IEventTransport
{
private readonly string _incomingPipeName; // Where this process listens for requests/publish
private readonly string _outgoingPipeName; // Where this process sends requests/publish to the other side
private readonly string _replyPipeName; // Where this process listens for responses (advertised as ReplyAddress)
public NamedPipeTransport(string incomingPipeName, string outgoingPipeName, string replyPipeName)
{
_incomingPipeName = incomingPipeName;
_outgoingPipeName = outgoingPipeName;
_replyPipeName = replyPipeName;
}
public string ReplyAddress => _replyPipeName;
public async Task StartAsync(Func<EventEnvelope, Task> onMessageAsync, CancellationToken cancellationToken = default)
{
// Start two server loops: requests (incoming) and responses (reply)
_ = Task.Run(() => ServerLoopAsync(_incomingPipeName, onMessageAsync, cancellationToken), cancellationToken);
_ = Task.Run(() => ServerLoopAsync(_replyPipeName, onMessageAsync, cancellationToken), cancellationToken);
await Task.CompletedTask;
}
public async Task SendAsync(EventEnvelope envelope, CancellationToken cancellationToken = default)
{
var targetPipe = envelope.IsResponse && !string.IsNullOrEmpty(envelope.ReplyTo)
? envelope.ReplyTo!
: _outgoingPipeName;
await ClientSendAsync(targetPipe, envelope, cancellationToken).ConfigureAwait(false);
}
private static async Task ServerLoopAsync(string pipeName, Func<EventEnvelope, Task> onMessageAsync, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
using var server = new NamedPipeServerStream(pipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
await server.WaitForConnectionAsync(ct).ConfigureAwait(false);
try
{
// Read multiple messages on this connection
while (server.IsConnected && !ct.IsCancellationRequested)
{
var envelope = await ReadEnvelopeAsync(server, ct).ConfigureAwait(false);
if (envelope == null) break;
await onMessageAsync(envelope).ConfigureAwait(false);
}
}
catch { /* swallow for demo */ }
finally
{
try { server.Disconnect(); } catch { /* ignore */ }
}
}
}
private static async Task ClientSendAsync(string pipeName, EventEnvelope envelope, CancellationToken ct)
{
using var client = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous);
await client.ConnectAsync(ct).ConfigureAwait(false);
await WriteEnvelopeAsync(client, envelope, ct).ConfigureAwait(false);
await client.FlushAsync(ct).ConfigureAwait(false);
}
private static async Task WriteEnvelopeAsync(Stream stream, EventEnvelope envelope, CancellationToken ct)
{
var json = JsonSerializer.Serialize(envelope);
var payload = Encoding.UTF8.GetBytes(json);
var lengthPrefix = BitConverter.GetBytes(payload.Length);
await stream.WriteAsync(lengthPrefix.AsMemory(0, 4), ct);
await stream.WriteAsync(payload.AsMemory(), ct);
}
private static async Task<EventEnvelope?> ReadEnvelopeAsync(Stream stream, CancellationToken ct)
{
var lenBuf = new byte[4];
var read = await stream.ReadAsync(lenBuf.AsMemory(0, 4), ct).ConfigureAwait(false);
if (read == 0) return null;
if (read < 4) return null;
var length = BitConverter.ToInt32(lenBuf, 0);
var payload = new byte[length];
var total = 0;
while (total < length)
{
var r = await stream.ReadAsync(payload.AsMemory(total, length - total), ct).ConfigureAwait(false);
if (r == 0) break;
total += r;
}
if (total < length) return null;
var json = Encoding.UTF8.GetString(payload);
return JsonSerializer.Deserialize<EventEnvelope>(json);
}
Memory-Mapped File Transport
The pattern includes the following. This simple demo has a single message buffer (overwrite), and no multi-producer queueing.
- Two MMF:
requestMapName,responseMapName - Two EventWaitHandles: requestSignalName, responseSignalName
- Writers writes JSON, then signals.
- Reader waits, reads, and dispatches.
using System;
using System.IO.MemoryMappedFiles;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class MemoryMappedTransport : IEventTransport
{
private readonly string _requestMapName;
private readonly string _responseMapName;
private readonly string _requestSignalName;
private readonly string _responseSignalName;
private const int BufferSize = 1 << 20; // 1MB buffer for demo
public MemoryMappedTransport(
string requestMapName, string responseMapName,
string requestSignalName, string responseSignalName)
{
_requestMapName = requestMapName;
_responseMapName = responseMapName;
_requestSignalName = requestSignalName;
_responseSignalName = responseSignalName;
}
/// <summary>Semantic only; actual routing uses known map/signal.</summary>
public string ReplyAddress => _responseMapName;
/// <summary>Start listeners for request and response channels.<summary>
public async Task StartAsync(Func<EventEnvelope, Task> onMessageAsync, CancellationToken cancellationToken = default)
{
_ = Task.Run(() => MapListenerAsync(_requestMapName, _requestSignalName, onMessageAsync, cancellationToken), cancellationToken);
_ = Task.Run(() => MapListenerAsync(_responseMapName, _responseSignalName, onMessageAsync, cancellationToken), cancellationToken);
await Task.CompletedTask;
}
public async Task SendAsync(EventEnvelope envelope, CancellationToken cancellationToken = default)
{
var mapName = envelope.IsResponse ? _responseMapName : _requestMapName;
var signalName = envelope.IsResponse ? _responseSignalName : _requestSignalName;
using var mmf = MemoryMappedFile.CreateOrOpen(mapName, BufferSize);
using var accessor = mmf.CreateViewAccessor();
var json = JsonSerializer.Serialize(envelope);
var bytes = Encoding.UTF8.GetBytes(json);
if (bytes.Length + 4 > BufferSize)
throw new InvalidOperationException("Payload too large for MMF demo buffer.");
accessor.Write(0, bytes.Length);
accessor.WriteArray(4, bytes, 0, bytes.Length);
using var ev = new EventWaitHandle(false, EventResetMode.AutoReset, signalName);
ev.Set();
await Task.CompletedTask;
}
private static async Task MapListenerAsync(string mapName, string signalName, Func<EventEnvelope, Task> onMessageAsync, CancellationToken ct)
{
using var ev = new EventWaitHandle(false, EventResetMode.AutoReset, signalName);
while (!ct.IsCancellationRequested)
{
ev.WaitOne(); // Wait for signal
try
{
using var mmf = MemoryMappedFile.CreateOrOpen(mapName, BufferSize);
using var accessor = mmf.CreateViewAccessor();
var length = accessor.ReadInt32(0);
if (length <= 0 || length > BufferSize - 4)
continue;
var buffer = new byte[length];
accessor.ReadArray(4, buffer, 0, length);
var json = Encoding.UTF8.GetString(buffer);
var envelope = JsonSerializer.Deserialize<EventEnvelope>(json);
if (envelope != null)
{
await onMessageAsync(envelope).ConfigureAwait(false);
}
}
catch { /* swallow for demo */ }
}
}
}
TCP/IP Transport
This pattern includes:
- Server listens on
requestPortfor requests/publishes. - Responses are sent to
replyPortwhere the requester listens. - Message framing: 4-byte length prefix + JSON.
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class TcpTransport : IEventTransport
{
private readonly IPEndPoint _requestListen;
private readonly IPEndPoint _replyListen;
/// <summary>Where to send requests/publishes</summary>
private readonly IPEndPoint _requestSend;
/// <summary>Where to send responses</summary>
private readonly IPEndPoint _replySend;
public TcpTransport(
IPEndPoint requestListen,
IPEndPoint replyListen,
IPEndPoint requestSend,
IPEndPoint replySend)
{
_requestListen = requestListen;
_replyListen = replyListen;
_requestSend = requestSend;
_replySend = replySend;
}
public string ReplyAddress => $"{_replyListen.Address}:{_replyListen.Port}";
public async Task StartAsync(
Func<EventEnvelope, Task> onMessageAsync,
CancellationToken cancellationToken = default)
{
_ = Task.Run(() => ListenLoopAsync(_requestListen, onMessageAsync, cancellationToken), cancellationToken);
_ = Task.Run(() => ListenLoopAsync(_replyListen, onMessageAsync, cancellationToken), cancellationToken);
await Task.CompletedTask;
}
public async Task SendAsync(EventEnvelope envelope, CancellationToken cancellationToken = default)
{
var target = envelope.IsResponse ? _replySend : _requestSend;
using var client = new TcpClient();
await client.ConnectAsync(target.Address, target.Port, cancellationToken)
.ConfigureAwait(false);
using var stream = client.GetStream();
await WriteEnvelopeAsync(stream, envelope, cancellationToken).ConfigureAwait(false);
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task ListenLoopAsync(
IPEndPoint endpoint,
Func<EventEnvelope, Task> onMessageAsync,
CancellationToken ct)
{
var listener = new TcpListener(endpoint);
listener.Start();
try
{
while (!ct.IsCancellationRequested)
{
var client = await listener.AcceptTcpClientAsync(ct)
.ConfigureAwait(false);
_ = Task.Run(async () =>
{
using var c = client;
using var s = c.GetStream();
while (!ct.IsCancellationRequested)
{
var envelope = await ReadEnvelopeAsync(s, ct).ConfigureAwait(false);
if (envelope == null)
break;
await onMessageAsync(envelope).ConfigureAwait(false);
}
}, ct);
}
}
catch { /* swallow for demo */ }
finally
{
try { listener.Stop(); } catch { }
}
}
private static async Task WriteEnvelopeAsync(
NetworkStream stream,
EventEnvelope envelope, CancellationToken ct)
{
var json = JsonSerializer.Serialize(envelope);
var bytes = Encoding.UTF8.GetBytes(json);
var len = BitConverter.GetBytes(bytes.Length);
await stream.WriteAsync(len.AsMemory(0, 4), ct);
await stream.WriteAsync(bytes.AsMemory(), ct);
}
private static async Task<EventEnvelope?> ReadEnvelopeAsync(NetworkStream stream, CancellationToken ct)
{
var lenBuf = new byte[4];
var read = await stream.ReadAsync(lenBuf.AsMemory(0, 4), ct)
.ConfigureAwait(false);
if (read == 0)
return null;
if (read < 4)
return null;
var length = BitConverter.ToInt32(lenBuf, 0);
var payload = new byte[length];
var total = 0;
while (total < length)
{
var r = await stream.ReadAsync(payload.AsMemory(total, length - total), ct)
.ConfigureAwait(false);
if (r == 0) break;
total += r;
}
if (total < length)
return null;
var json = Encoding.UTF8.GetString(payload);
return JsonSerializer.Deserialize<EventEnvelope>(json);
}
}
Example Implementation
DI Registration (ASP.NET Core)
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Builder;
using System.Net;
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Choose ONE transport and register it + aggregator
builder.Services.AddSingleton<IEventAggregator, EventAggregator>();
// Example: Named Pipes (client-side perspective)
builder.Services.AddSingleton<IEventTransport>(sp =>
new NamedPipeTransport(
incomingPipeName: "client-requests-in", // this process listens (if needed)
outgoingPipeName: "server-requests-in", // send requests/publishes to server
replyPipeName: "client-replies-in")); // this process listens for responses
// Example: Memory-Mapped Files
// builder.Services.AddSingleton<IEventTransport>(sp =>
// new MemoryMappedTransport("req-map", "resp-map", "req-signal", "resp-signal"));
// Example: TCP
// builder.Services.AddSingleton<IEventTransport>(sp =>
// new TcpTransport(
// requestListen: new IPEndPoint(IPAddress.Loopback, 5001),
// replyListen: new IPEndPoint(IPAddress.Loopback, 5002),
// requestSend: new IPEndPoint(IPAddress.Loopback, 5000), // other side request port
// replySend: new IPEndPoint(IPAddress.Loopback, 5003))); // other side reply port
var app = builder.Build();
var aggregator = app.Services.GetRequiredService<IEventAggregator>();
var transport = app.Services.GetRequiredService<IEventTransport>();
await aggregator.UseTransportAsync(transport);
// Sample subscriptions
aggregator.Subscribe<UserCreatedEvent>(e => Console.WriteLine($"[Local] User created: {e.Username}"));
aggregator.SubscribeRequest<GetUserRequest, GetUserResponse>(async req =>
{
await Task.Delay(25);
return new GetUserResponse { Username = req.Username, Exists = true };
});
app.Run();
// Sample event types
public record UserCreatedEvent(string Username);
public record GetUserRequest(string Username);
public record GetUserResponse { public string Username { get; set; } = "";
public bool Exists { get; set; } }
Publishing and Requesting
public class UserService
{
private readonly IEventAggregator _aggregator;
public UserService(IEventAggregator aggregator) => _aggregator = aggregator;
public async Task CreateAsync(string username, CancellationToken ct = default)
{
await _aggregator.PublishAsync(new UserCreatedEvent(username), ct);
}
public async Task<bool> CheckUserExistsAsync(string username, CancellationToken ct = default)
{
var resp = await _aggregator.RequestAsync<GetUserRequest, GetUserResponse>(
new GetUserRequest(username), ct);
return resp.Exists;
}
}
Notes and Considerations
- Reliability:
- These demos are minimalistic for example purposes.
- For durable queues, consider MSMQ/Service Bus/RabbitMQ/Kafka depending on needs.
- Security:
- Named Pipes can use ACLs; TCP needs TLS + auth; MMFs need OS ACLs.
- Add validation and schema versioning.
- Backpressure & Flow Control:
- Implement bounded queues, retry, and exponential backoff where applicable.
- Type Resolution:
AssemblyQualifiedNameassumes shared assemblies across processes.- Consider a type registry or message contracts package shared by both sides.
- Framing:
- Length-prefix framing prevents stream-boundary issues; keep consistent across transports.
- Concurrency:
- The Memory-Mapped example is single-slot; for multiple writers/readers, implement a ring buffer or per-message files + directory watcher.
- Cancellations:
- Add cancellation-aware timeouts and retry policies
Copyright 2025 Xeno Innovations, Inc.