Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL support #7

Closed
jkears opened this issue Feb 22, 2024 · 6 comments
Closed

PostgreSQL support #7

jkears opened this issue Feb 22, 2024 · 6 comments
Assignees
Labels
enhancement New feature or request question Further information is requested

Comments

@jkears
Copy link

jkears commented Feb 22, 2024

We have been using SQLTransactionalOutbox for a few years now and it works great, thank you so much for your contribution to the greater community!

We have a new domain scenario that requires integration with pgvector in PostgreSQL to generate and query OpenAI based embedding sets.

It would be ideal if we could create an adaptor for SQLTransactionalOutbox that could integrate with PostgreSQL using Npgsql.

Is that possible with your framework?

@cajuncoding cajuncoding self-assigned this Feb 23, 2024
@cajuncoding cajuncoding added the question Further information is requested label Feb 23, 2024
@cajuncoding
Copy link
Owner

cajuncoding commented Feb 23, 2024

Hey John (@jkears),

Yeah I remember some initial conversations way back when.... and I'm really happy that the project has been useful & trouble-free for you 🚀.

To answer your question, yes absolutely. The entire project is interface based using constructor injection for all dependencies.

So you can certainly implement support for PostgreSQL (relatively easily).

You are free to do this in your own project as needed, however it'd be awesome if you submit a PR with an implementation that doesn't have any unnecessary dependencies (e.g. use Microsoft.Data.Sql directly, no external ORMs, etc.)...

This SqlTransactionalOutbox project is in use in production in multiple services for my current client, but I just realized I never came back around to mash out some documentation for it here in GitHub that would have answered your question if I had 😁. But unfortunately we don't use PostreSQL for anything (all SQL Server all-the-way 🚀) so I haven't ever had the need to build the implementation.

I've been focused on a few of my other projects lately... but I'll be sure to get the Doc updates for this back on my todo list....

@cajuncoding
Copy link
Owner

cajuncoding commented Feb 23, 2024

Here's some more technical info. for you to look into....

You shouldn't need to worry about the main logical processing handled by the Outbox Processor as all of that should be agnostic of the underlying mechanism(s).

The main interface you would need to implement is ISqlTransactionalOutboxRepository<TUniqueIdentifier, TPayload> which you can find here.

The current concrete implementation of that using SQL Server via Microsoft.Data.SqlClient library is here in the public class SqlServerOutboxRepository<TUniqueIdentifier, TPayload> which you can find here.

So with a new implementation such as PostgreSqlOutboxRepository<TUniqueIdentifier, TPayload> you'd model it pretty much exacly like the SQL Server version. Then it can be provided as the implementation to the rest of the infrastructure which are:

  • TransactionalOutbox: Is responsible for Adding/Inserting items into the Outbox and takes an ISqlTransactionalOutboxRepository as constrcutore dependency as shown here.
  • TransactionalOutboxProcessor: Is responsible for processing the outbox when called on demand/schedule/event/etc. and also takes in the same ISqlTransactionalOutboxRepository as a constrcutor dependency as shown here.

There are some additional nuanced things to take note of when implementing the ISqlTransactionalOutboxRepository, but most notably is that the table schema & fields are configurable options so the configuration should be used when forming the raw SQL queries, and (for SQL Server) this is encapsulated in a dedicated QueryBuilder class which could also be modelled similarly here.

Finally, once it's all implemented you could certainly call use it! But to make things really elegant it's best to sprinkle in some Extension methods to make things REALLY easy extending the PostgreSQL classes which we did for SQL Server Transaction and other classes:

  • SqlClientOutboxCustomExtensions are here...
  • SqlClientOutboxProcessingCustomExtensions are here...

For more detailed investigation I recommend walking this message backwards 😁 (or outside to inner workings)... from the Extension methods that make everything super easy (using Default implementations) down into lower levels where you have full control over everything and can customize to your heart's content.

Enjoy!

@jkears
Copy link
Author

jkears commented Mar 6, 2024

@cajuncoding Thanks so much, I just saw your helpful instructions but was experimenting, and hit a bit of a wall and I am not sure how to handle it.
In the AcquireAppLock method where it calls CreateAcquireLockSqlCommand...

 
            var lockScope = NpgsqlTransaction != null ? SqlServerAppLockScope.Transaction : SqlServerAppLockScope.Session;

            using var sqlCmd = SqlAppLockCommandBuilder.CreateAcquireLockSqlCommand(
                sqlConn, 
                lockName,
                lockScope,
                acquisitionTimeoutSeconds,
                NpgsqlCommandTimeout,
                NpgsqlTransaction
            );

... that requires specific stored procedures within MSQL.

public class SqlServerStoredProcNames
{
    public const string AcquireLock = "dbo.sp_getapplock ";
    public const string ReleaseLock = "dbo.sp_releaseapplock";
}

How should I handle this?

@jkears
Copy link
Author

jkears commented Mar 6, 2024

I used PostGreSQL's advisory locks as a substitute of sp_getapplock which I hope can provide the same ...

   public static async Task<SqlServerAppLock> AcquireAppLockAsync(
        this NpgsqlConnection NpgsqlConnection, 
        string lockName,
        int acquisitionTimeoutSeconds = 0, 
        bool throwsException = true,
        int? NpgsqlCommandTimeout = null,
        NpgsqlTransaction NpgsqlTransaction = null
    )
    {
        var sqlConn = NpgsqlTransaction.Connection ?? throw new ArgumentNullException(nameof(NpgsqlTransaction), "NpgsqlTransaction's connection cannot be null.");
        var lockKey = GenerateLockKey(lockName);
        var acquired = await TryAcquireAdvisoryLockAsync(sqlConn, lockKey, NpgsqlTransaction);
        var acquisitionResult = acquired ? SqlServerAppLockAcquisitionResult.AcquiredImmediately : SqlServerAppLockAcquisitionResult.FailedDueToTimeout;

        var resultAppLock = new SqlServerAppLock(
            lockName,
            SqlServerAppLockScope.Session, // Or Transaction, based on your context
            acquisitionResult,
            releaseAction: () => throw new NotSupportedException(),
            releaseActionAsync: async () =>
            {
                await ReleaseAdvisoryLockAsync(sqlConn, lockKey);
            }
        );

        if (!acquired && throwsException)
        {
            throw new TimeoutException($"Failed to acquire lock '{lockName}' within {acquisitionTimeoutSeconds} seconds.");
        }

        return resultAppLock;
    }

    public static int GenerateLockKey(string lockName)
    {
        if (string.IsNullOrEmpty(lockName))
            throw new ArgumentException("Lock name cannot be null or empty.", nameof(lockName));

        // Use unchecked to allow overflow without throwing an exception, because GetHashCode can return negative values
        unchecked
        {
            // Get the hash code of the lockName
            var hash = lockName.GetHashCode();

            // Ensure the result is non-negative
            var lockKey = hash & 0x7FFFFFFF; // Mask to ignore the sign bit and ensure non-negativity

            return lockKey;
        }
    }

   private static async Task<bool> TryAcquireAdvisoryLockAsync(NpgsqlConnection sqlConn, int lockKey, NpgsqlTransaction sqlTransaction)
  {
      await using var cmd = new NpgsqlCommand($"SELECT pg_try_advisory_lock({lockKey})", sqlConn, sqlTransaction);
      var result = await cmd.ExecuteScalarAsync();
      return (bool)result;
  }

The Release as per ...

public static async Task<bool> ReleaseAdvisoryLockAsync(NpgsqlConnection sqlConn, int lockKey)
{
    await using var cmd = new NpgsqlCommand($"SELECT pg_advisory_unlock({lockKey})", sqlConn);
    var result = await cmd.ExecuteScalarAsync();
    return (bool)result;
}

Either way, it seems to process the events and pushes the OutboxItem to the Outbox Publisher which is Azure Service Bus, and thus I think I have it working ... once I know for sure I will share the changes which are not that much. Also I will like create the same for MySQL.

@jkears
Copy link
Author

jkears commented Mar 6, 2024

This is closed but I will follow up later.

@jkears jkears closed this as completed Mar 6, 2024
@cajuncoding
Copy link
Owner

Hi John,

Yes I believe you are correct 👍 I quickly looked and the closest mechanism offered from Postgres SQL does appear to be explicit advisory locks.

I think you may be able to simplify a bit by overriding/implementing the behavior in the lower level repository here:

public virtual async Task<IAsyncDisposable> AcquireDistributedProcessingMutexAsync()

Then you could return your own IAsyncDisposable implementation that releases the lock when disposed… or null if not acquired.

This would de-couple your code from the SqlAppLock library which is targeted at SQL Server.

And then the framework should be able to handle everything.

@cajuncoding cajuncoding added the enhancement New feature or request label Mar 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants