New Westminster, BC, Canada
+1(902) 601-1902

Dotnet(Blazor) + Postgres: Stream table CRUD notifications using NOTIFY/LISTEN

Dotnet(Blazor) + Postgres: Stream table CRUD notifications using NOTIFY/LISTEN

This post is inspired by this other post with a couple of performance improvements: we will use DB Context Factory instead of Npgsql, and bundle notifications to prevent backpressure condition.

First, let’s look at this query we need to run. It creates a trigger function that will dispatch our notification whenever a new row is created, updated, or deleted. To prevent “payload too large” exception from Postgres, we will reduce the payload only to a primary key in our record. You can add other columns that suit your needs.

CREATE OR REPLACE FUNCTION notify_trigger() 
RETURNS TRIGGER AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  channel_name TEXT;
  payload_items json;
BEGIN

  -- Get the Operation
  CASE TG_OP
  WHEN 'INSERT','UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
    RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;

  -- Get the Channel Name
  IF TG_ARGV[0] IS NULL THEN
    RAISE EXCEPTION 'A Channel Name is required as first argument';
  END IF;

  channel_name := TG_ARGV[0];

  -- Get the payload: be mindful of notification size limits!
  -- If your table has too many columns or some of them are of type json(b)
  -- it's better to just pass primary key to avoid "payload too large" exception
  -- payload_items := row_to_json(rec); -- this line is for lightweight tables, 
  -- otherwise:
  payload_items := json_build_object(
          'id', rec."ID");

  -- Build the payload
  payload := json_build_object(
      'timestamp', CURRENT_TIMESTAMP
    , 'row_version', rec.xmin
    , 'operation', TG_OP
    , 'schema', TG_TABLE_SCHEMA
    , 'table', TG_TABLE_NAME
    , 'payload', payload_items);

  -- Notify the channel
  PERFORM pg_notify(channel_name, payload);

  RETURN rec;
END;

Next, we add the actual trigger to call that function:

CREATE OR REPLACE TRIGGER my_trigger
AFTER INSERT OR UPDATE OR DELETE ON public."MyTable"
FOR EACH ROW EXECUTE PROCEDURE notify_trigger('my_table_event');

Now, let’s move on to our dotnet project and add all necessary services. For convenience, I will merge all pieces of the puzzle into just a handful of files. You can split it to your liking. First, we heed a handler. Instead of processing notifications one-by-one as they come in, we will bundle them into a list.

namespace Project.Services
{
    /// <summary>
    /// A Notification received from Postgres NOTIFY / LISTEN.
    /// </summary>
    public record PostgresNotification
    {

        public required int PID { get; set; }
        public required string Channel { get; set; }
        public required string Payload { get; set; }
    }

    /// <summary>
    /// Handles Notifications received from a Postgres Channel.
    /// </summary>
    public interface IPostgresNotificationHandler
    {

        public event EventHandler<List<PostgresNotification>>? NotificationReceived;

        ValueTask HandleNotificationAsync(List<PostgresNotification> notifications, CancellationToken cancellationToken);
    }

   public class NotificationHandlerImpl : IPostgresNotificationHandler
    {
        private readonly ILogger<NotificationHandlerImpl> _logger;

        public NotificationHandlerImpl(ILogger<NotificationHandlerImpl> logger)
        {
            _logger = logger;
        }

        public event EventHandler<List<PostgresNotification>>? NotificationReceived;

        public ValueTask HandleNotificationAsync(List<PostgresNotification> notifications, CancellationToken cancellationToken)
        {
            // add loging, if necessary

            NotificationReceived?.Invoke(this, notifications);

            return ValueTask.CompletedTask;
        }
    }
}

The handler will be called from a hosted background service. In this example, we will be using already existing IDbContextFactory to get the connection and a ConcurrentQueue to bundle notifications and dispatch them through the handler once a second. This should reduce the risk of backpressure and make sure our UI doesn’t hang or jitter.

namespace Project.Services
{
    /// <summary>
    /// Options to configure the <see cref="PostgresNotificationService"/>.
    /// </summary>
    public class PostgresNotificationServiceOptions
    {
        public required string ChannelName { get; set; }
        public int MaxCapacity { get; set; } = 100; // Maximum Capacity of unhandled Notifications.
    }

    /// <summary>
    /// This Service waits for Notifications received on a given Postgres Channel name.
    /// </summary>
    public class PostgresNotificationService : BackgroundService
    {
        private readonly ILogger<PostgresNotificationService> _logger;

        private readonly PostgresNotificationServiceOptions _options;
        private readonly IPostgresNotificationHandler _postgresNotificationHandler;
        private readonly IDbContextFactory<DbContextImpl> _dbContextFactory;
        private readonly ConcurrentQueue<PostgresNotification> _notificationQueue = new();

        public PostgresNotificationService(ILogger<PostgresNotificationService> logger, 
            IOptions<PostgresNotificationServiceOptions> options, 
            IDbContextFactory<DbContextImpl> dbContextFactory, 
            IPostgresNotificationHandler postgresNotificationHandler)
        {
            _logger = logger;
            _options = options.Value;
            _dbContextFactory = dbContextFactory;
            _postgresNotificationHandler = postgresNotificationHandler;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            
            var channel = Channel.CreateBounded<PostgresNotification>(new BoundedChannelOptions(_options.MaxCapacity)
            {
                SingleReader = true,
                SingleWriter = true,
                FullMode = BoundedChannelFullMode.Wait
            });

            await Task.WhenAny(
                    SetupPostgresAsync(channel, stoppingToken), 
                    ProcessChannelAsync(channel, stoppingToken),
                    DispatchNotificationsAsync(stoppingToken))
                .ConfigureAwait(false);

        }

        // Initializes the Postgres Listener by issueing a LISTEN Command.
        async Task SetupPostgresAsync(Channel<PostgresNotification> channel, 
                CancellationToken cancellationToken)
        {
                await using var dbContext = await _dbContextFactory.CreateDbContextAsync();


               // When using Postgres in dotnet, your dbContext is actually an instance of `NpgsqlConnection`
               var connection = dbContext.Database.GetDbConnection() as NpgsqlConnection;

               await connection!.OpenAsync(cancellationToken);

                connection.Notification += (sender, x) =>
                {
                    var notification = new PostgresNotification
                    {
                        Channel = x.Channel,
                        PID = x.PID,
                        Payload = x.Payload,
                    };

                    channel.Writer.TryWrite(notification);
                };

                using (var command = new NpgsqlCommand($"LISTEN {_options.ChannelName}", connection))
                {
                    await command
                        .ExecuteNonQueryAsync(cancellationToken)
                        .ConfigureAwait(false);
                }

                // And now we are putting the Connection into the Wait State,
                // until the Cancellation is requested.
                while (!cancellationToken.IsCancellationRequested)
                {
                    await connection
                        .WaitAsync(cancellationToken)
                        .ConfigureAwait(false);
                }
       }

       // Add incoming notifications to the queue for reading
       async Task ProcessChannelAsync(Channel<PostgresNotification> channel, CancellationToken cancellationToken)
       {
                await foreach (var notification in channel.Reader.ReadAllAsync(cancellationToken))
                {
                     _notificationQueue.Enqueue(notification);
                 }
        }

        // Dispatch notificatione every so often (i.e. 1 second), if any exist in a queue
        private async Task DispatchNotificationsAsync(CancellationToken stoppingToken)
        {
             while(!stoppingToken.IsCancellationRequested)
             {
                 await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
                 var notifications = new List<PostgresNotification>();
                 while(_notificationQueue.TryDequeue(out var notification))
                 {
                     notificaitons.Add(notification);
                 }

                 if(notifications.Count != 0)
                 {
                     // log, if needed
                    await _postgresNotificationHandler.HandleNotificationsAsync(notifications, stoppingToken);
                 }
             }
        }
    }
}

Now we need to register everything with DI:

builder.Services.AddSingleton<IPostgresNotificationHandler, NotificationHandlerImpl>();

builder.Services.Configure<PostgresNotificationServiceOptions>(o => o.ChannelName = "my_table_event");
builder.Services.AddHostedService<PostgresNotificationService>();

And use it in our Blazor component:

@code
{
    [Inject] public required IPostgresNotificationHandler NotificationHandler;

    protected override void OnInitialized()
    {
        NotificaitonHandler.NotificationReceived += PgNotificationHandler
    }

    public void Dispose()
    {
        NotificationHandler.NotificationReceived -= PgNotificationHandler
    }

    private async void PgNotificationHandler(object? sender, List<PostgresNotification> ns)
    {
        // ...
        // your processing logic here
       // ...
       StateHasChanged();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

GET IN TOUCH

    X
    CONTACT US