Search code examples
azure-blob-storagetransactionscopetwo-phase-commit

Generic ResourceManager/IEnlistmentNotification for Azure Blob Storage operations to achieve 2 phase commit


My application using Azure SQL and Azure Blob Storage for some business requirements, most of the case need to support Atomic Transaction for both DB & Blob, if DB entry fails should rollback Blob as well(go all or no go), for DB side can use TransactionScope but Blob don't have any direct options, so decided to go 2 phase commit with help of IEnlistmentNotification interface, it working as expected but am trying to created common class/implementation to support all operations or at least few most used operations in Blob storage(upload, delete, SetMetadata ...), I don't get any idea about how to create some implementation, is this possible and any code samples available will help me lot.

Resource Manager

public class AzureBlobStorageResourceManager : IEnlistmentNotification, IDisposable
    {
        private List<AzureBlobStore> _operations;
        private bool _disposedValue;
        public void EnlistOperation(AzureBlobStore operation)
        {            
            if (_operations is null)
            {
                var currentTransaction = Transaction.Current;
                currentTransaction?.EnlistVolatile(this, EnlistmentOptions.None);
                _operations = new List<AzureBlobStore>();
            }
            _operations.Add(operation);
        }
        public void Commit(Enlistment enlistment)
        {
            foreach (var blobOperation in _operations)
            {
                blobOperation.Dispose();
            }
            enlistment.Done();
        }

        public void InDoubt(Enlistment enlistment)
        {
            foreach (var blobOperation in _operations)
            {
                blobOperation.RollBack().ConfigureAwait(false);
            }
            enlistment.Done();
        }

        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            try
            {
                foreach (var blobOperation in _operations)
                {
                    blobOperation.DoWork().ConfigureAwait(false);
                }
                preparingEnlistment.Prepared();
            }
            catch
            {                
                preparingEnlistment.ForceRollback();
            }
        }

        public void Rollback(Enlistment enlistment)
        {            
            foreach (var blobOperation in _operations)
            {
                blobOperation.RollBack().ConfigureAwait(false);
            }
            enlistment.Done();           
        }
       
        public void Dispose() => Dispose(true);
        
        protected virtual void Dispose(bool disposing)
        {
            if (_disposedValue) return;

            if (disposing)
            {
                foreach (var operation in _operations)
                    operation.Dispose();
            }

            _disposedValue = true;
        }
        ~AzureBlobStorageResourceManager() => Dispose(false);
    }

Actual Blob Operation

public class AzureBlobStore : IDisposable
    {
        private string _backupPath;
        private readonly string _blobName;
        private Stream _content;
        private bool _disposedValue;
        private BlobClient _blobClient;
        public AzureBlobStore(BlobContainerClient containerClient, string blobName, Stream content)
        {
            (_blobName, _content, _blobClient) = (blobName, content, containerClient.GetBlobClient(blobName));            
        }
        public async Task DoWork()
        {
            _content.Position = 0;
            await _blobClient.UploadAsync(_content).ConfigureAwait(false);
                       
            /*            
            await _blobClient.DeleteAsync(Azure.Storage.Blobs.Models.DeleteSnapshotsOption.IncludeSnapshots).ConfigureAwait(false); 
            */
        }

        public async Task RollBack()
        {
            // Compensation logic for Upload
            await _blobClient.DeleteIfExistsAsync(Azure.Storage.Blobs.Models.DeleteSnapshotsOption.IncludeSnapshots).ConfigureAwait(false);

            // Compensation logic for Delete
            /* await _blobClient.UploadAsync(_backupPath); */
        }

        public void Dispose() => Dispose(true);
       
        protected virtual void Dispose(bool disposing)
        {
            if (_disposedValue) return;

            if (disposing)
            {                
                _blobClient.DeleteIfExistsAsync(Azure.Storage.Blobs.Models.DeleteSnapshotsOption.IncludeSnapshots);
            }

            _disposedValue = true;
        }
        ~AzureBlobStore() => Dispose(false);
    }

Code inside /* */ is another one Blob operation, am looking for common way to solve this.


Solution

  • Here you have a sample of how to implement a BlobStorage solution integrated with .NET Transactions.

    First you have the Resource Manager. This handles all the operations and rollback if the transaction fails. It is almost the same as you have.

    public class BlobStorageResourceManager : IEnlistmentNotification, IBlobStorageResourceManager
    {
        List<TransactionalBlobOperation> executedOperations;
    
        public Task ExecuteOperation(BlobOperation operation)
        {
            AddOperation(operation);
            return operation.ExecuteInTransaction();
        }
    
        public async Task<T> ExecuteOperation<T>(BlobOperation<T> operation)
        {
            AddOperation(operation);
            return (T)(await operation.ExecuteInTransaction())!;
        }
    
        void AddOperation(TransactionalBlobOperation operation)
        {
            if (executedOperations == null)
            {
                var currentTransaction = Transaction.Current;
                currentTransaction?.EnlistVolatile(this, EnlistmentOptions.None);
                executedOperations = new List<TransactionalBlobOperation>();
            }
    
            executedOperations.Add(operation);
        }
    
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            preparingEnlistment.Prepared();
        }
    
        public void Commit(Enlistment enlistment)
        {
            enlistment.Done();
        }
    
        public void Rollback(Enlistment enlistment)
        {
            executedOperations.Reverse();
            foreach (var operation in executedOperations)
            {
                operation.Rollback().GetAwaiter().GetResult();
            }
    
            enlistment.Done();
        }
    
        public void InDoubt(Enlistment enlistment)
        {
            Rollback(enlistment);
        }
    }
    

    Then you have the normal BlobStorage as you would have with no transactions, but with some tweaks to use transactional operations.

    public class BlobStorage : IObjectStorage
    {
        readonly string connectionString;
        readonly string containerName;
        readonly IBlobStorageResourceManager blobStorageResourceManager;
    
        public BlobStorage(
            string connectionString,
            string containerName,
            IBlobStorageResourceManager blobStorageResourceManager)
        {
            this.connectionString = connectionString.ThrowIfNull();
            this.containerName = containerName.ThrowIfNull();
            this.blobStorageResourceManager = blobStorageResourceManager.ThrowIfNull();
        }
    
        public Task Delete(string fullFileName)
        {
            var container = new BlobContainerClient(connectionString, containerName);
            var operation = new DeleteBlobOperation(container, fullFileName);
    
            if (IsInTransaction())
            {
                return blobStorageResourceManager.ExecuteOperation(operation);
            }
    
            return operation.Execute();
        }
    
        public async Task<Stream?> Get(string fullFileName)
        {
            var containerClient = new BlobContainerClient(connectionString, containerName);
            var blobClient = containerClient.GetBlobClient(fullFileName);
    
            return await blobClient.ExistsAsync()
                ? await blobClient.OpenReadAsync()
                : null;
        }
    
        public Task<Stream> GetOrThrow(string fullFileName)
        {
            var containerClient = new BlobContainerClient(connectionString, containerName);
            var blobClient = containerClient.GetBlobClient(fullFileName);
    
            return blobClient.OpenReadAsync();
        }
    
        public async Task<string> Upload(string fullFileName, Stream stream)
        {
            var container = new BlobContainerClient(connectionString, containerName);
            var operation = new UploadBlobOperation(container, fullFileName, stream);
    
            if (IsInTransaction())
            {
                return await blobStorageResourceManager.ExecuteOperation(operation);
            }
    
            return await operation.Execute();
        }
    }
    

    Then you have each operation separated in one class each. Example:

    public class UploadBlobOperation : BlobOperation<string>
    {
        readonly BlobContainerClient containerClient;
        readonly string fullFileName;
        readonly Stream stream;
    
        public UploadBlobOperation(
            BlobContainerClient containerClient,
            string fullFileName,
            Stream stream)
        {
            this.containerClient = containerClient.ThrowIfNull();
            this.fullFileName = fullFileName.ThrowIfNullEmptyOrWhiteSpace();
            this.stream = stream.ThrowIfNull();
        }
    
        public override async Task<string> Execute()
        {
            var blobClient = containerClient.GetBlobClient(fullFileName);
    
            await blobClient.UploadAsync(stream);
    
            return blobClient.Uri.AbsoluteUri;
        }
    
        public override async Task<object?> ExecuteInTransaction()
        {
            return await Execute();
        }
    
        public override async Task Rollback()
        {
            var blobClient = containerClient.GetBlobClient(fullFileName);
    
            await blobClient.DeleteAsync();
        }
    }
    

    The usage is like this:

            using (var transactionScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                await storage.Delete(fileName1);
                await storage.Upload(fileName2, stream);
            }
    

    I think you are getting confused in the AzureBlobStore class, where you are trying to have several operations with their respective rollbacks. I think it's better to have them separated.

    More information/full sample https://github.com/michaeltg17/TransactionalBlobStorage