Dotnet(Blazor) + Postgres: Stream table CRUD notifications using NOTIFY/LISTEN
This post is inspired by this other post with a couple of performance improvements: we will use DB Context Factory instead of Npgsql, and bundle notifications to prevent backpressure condition.
First, let’s look at this query we need to run. It creates a trigger function that will dispatch our notification whenever a new row is created, updated, or deleted. To prevent “payload too large” exception from Postgres, we will reduce the payload only to a primary key in our record. You can add other columns that suit your needs.
CREATE OR REPLACE FUNCTION notify_trigger()
RETURNS TRIGGER AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
channel_name TEXT;
payload_items json;
BEGIN
-- Get the Operation
CASE TG_OP
WHEN 'INSERT','UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get the Channel Name
IF TG_ARGV[0] IS NULL THEN
RAISE EXCEPTION 'A Channel Name is required as first argument';
END IF;
channel_name := TG_ARGV[0];
-- Get the payload: be mindful of notification size limits!
-- If your table has too many columns or some of them are of type json(b)
-- it's better to just pass primary key to avoid "payload too large" exception
-- payload_items := row_to_json(rec); -- this line is for lightweight tables,
-- otherwise:
payload_items := json_build_object(
'id', rec."ID");
-- Build the payload
payload := json_build_object(
'timestamp', CURRENT_TIMESTAMP
, 'row_version', rec.xmin
, 'operation', TG_OP
, 'schema', TG_TABLE_SCHEMA
, 'table', TG_TABLE_NAME
, 'payload', payload_items);
-- Notify the channel
PERFORM pg_notify(channel_name, payload);
RETURN rec;
END;
Next, we add the actual trigger to call that function:
CREATE OR REPLACE TRIGGER my_trigger
AFTER INSERT OR UPDATE OR DELETE ON public."MyTable"
FOR EACH ROW EXECUTE PROCEDURE notify_trigger('my_table_event');
Now, let’s move on to our dotnet project and add all necessary services. For convenience, I will merge all pieces of the puzzle into just a handful of files. You can split it to your liking. First, we heed a handler. Instead of processing notifications one-by-one as they come in, we will bundle them into a list.
namespace Project.Services
{
/// <summary>
/// A Notification received from Postgres NOTIFY / LISTEN.
/// </summary>
public record PostgresNotification
{
public required int PID { get; set; }
public required string Channel { get; set; }
public required string Payload { get; set; }
}
/// <summary>
/// Handles Notifications received from a Postgres Channel.
/// </summary>
public interface IPostgresNotificationHandler
{
public event EventHandler<List<PostgresNotification>>? NotificationReceived;
ValueTask HandleNotificationAsync(List<PostgresNotification> notifications, CancellationToken cancellationToken);
}
public class NotificationHandlerImpl : IPostgresNotificationHandler
{
private readonly ILogger<NotificationHandlerImpl> _logger;
public NotificationHandlerImpl(ILogger<NotificationHandlerImpl> logger)
{
_logger = logger;
}
public event EventHandler<List<PostgresNotification>>? NotificationReceived;
public ValueTask HandleNotificationAsync(List<PostgresNotification> notifications, CancellationToken cancellationToken)
{
// add loging, if necessary
NotificationReceived?.Invoke(this, notifications);
return ValueTask.CompletedTask;
}
}
}
The handler will be called from a hosted background service. In this example, we will be using already existing IDbContextFactory
to get the connection and a ConcurrentQueue
to bundle notifications and dispatch them through the handler once a second. This should reduce the risk of backpressure and make sure our UI doesn’t hang or jitter.
namespace Project.Services
{
/// <summary>
/// Options to configure the <see cref="PostgresNotificationService"/>.
/// </summary>
public class PostgresNotificationServiceOptions
{
public required string ChannelName { get; set; }
public int MaxCapacity { get; set; } = 100; // Maximum Capacity of unhandled Notifications.
}
/// <summary>
/// This Service waits for Notifications received on a given Postgres Channel name.
/// </summary>
public class PostgresNotificationService : BackgroundService
{
private readonly ILogger<PostgresNotificationService> _logger;
private readonly PostgresNotificationServiceOptions _options;
private readonly IPostgresNotificationHandler _postgresNotificationHandler;
private readonly IDbContextFactory<DbContextImpl> _dbContextFactory;
private readonly ConcurrentQueue<PostgresNotification> _notificationQueue = new();
public PostgresNotificationService(ILogger<PostgresNotificationService> logger,
IOptions<PostgresNotificationServiceOptions> options,
IDbContextFactory<DbContextImpl> dbContextFactory,
IPostgresNotificationHandler postgresNotificationHandler)
{
_logger = logger;
_options = options.Value;
_dbContextFactory = dbContextFactory;
_postgresNotificationHandler = postgresNotificationHandler;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var channel = Channel.CreateBounded<PostgresNotification>(new BoundedChannelOptions(_options.MaxCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
await Task.WhenAny(
SetupPostgresAsync(channel, stoppingToken),
ProcessChannelAsync(channel, stoppingToken),
DispatchNotificationsAsync(stoppingToken))
.ConfigureAwait(false);
}
// Initializes the Postgres Listener by issueing a LISTEN Command.
async Task SetupPostgresAsync(Channel<PostgresNotification> channel,
CancellationToken cancellationToken)
{
await using var dbContext = await _dbContextFactory.CreateDbContextAsync();
// When using Postgres in dotnet, your dbContext is actually an instance of `NpgsqlConnection`
var connection = dbContext.Database.GetDbConnection() as NpgsqlConnection;
await connection!.OpenAsync(cancellationToken);
connection.Notification += (sender, x) =>
{
var notification = new PostgresNotification
{
Channel = x.Channel,
PID = x.PID,
Payload = x.Payload,
};
channel.Writer.TryWrite(notification);
};
using (var command = new NpgsqlCommand($"LISTEN {_options.ChannelName}", connection))
{
await command
.ExecuteNonQueryAsync(cancellationToken)
.ConfigureAwait(false);
}
// And now we are putting the Connection into the Wait State,
// until the Cancellation is requested.
while (!cancellationToken.IsCancellationRequested)
{
await connection
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
}
// Add incoming notifications to the queue for reading
async Task ProcessChannelAsync(Channel<PostgresNotification> channel, CancellationToken cancellationToken)
{
await foreach (var notification in channel.Reader.ReadAllAsync(cancellationToken))
{
_notificationQueue.Enqueue(notification);
}
}
// Dispatch notificatione every so often (i.e. 1 second), if any exist in a queue
private async Task DispatchNotificationsAsync(CancellationToken stoppingToken)
{
while(!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
var notifications = new List<PostgresNotification>();
while(_notificationQueue.TryDequeue(out var notification))
{
notificaitons.Add(notification);
}
if(notifications.Count != 0)
{
// log, if needed
await _postgresNotificationHandler.HandleNotificationsAsync(notifications, stoppingToken);
}
}
}
}
}
Now we need to register everything with DI:
builder.Services.AddSingleton<IPostgresNotificationHandler, NotificationHandlerImpl>();
builder.Services.Configure<PostgresNotificationServiceOptions>(o => o.ChannelName = "my_table_event");
builder.Services.AddHostedService<PostgresNotificationService>();
And use it in our Blazor component:
@code
{
[Inject] public required IPostgresNotificationHandler NotificationHandler;
protected override void OnInitialized()
{
NotificaitonHandler.NotificationReceived += PgNotificationHandler
}
public void Dispose()
{
NotificationHandler.NotificationReceived -= PgNotificationHandler
}
private async void PgNotificationHandler(object? sender, List<PostgresNotification> ns)
{
// ...
// your processing logic here
// ...
StateHasChanged();
}
}