Azure Cosmos DB change feed allows us to listen to all inserts and updates to Cosmos DB collections. Azure Cosmos DB trigger is the easiest way to utilize the feature. This article demonstrates how to sync Cosmos DB collection to Blob Storage with change feed.

Azure Cosmos DB Emulator

Follow Azure Cosmos DB Emulator document to install the emulator. So we can develop Cosmos DB applications locally.

Azure Storage Emulator

Azure Storage Emulator comes with Visual Studio 2019. Follow this document to start the emulator.

Azure Storage Explorer

We can use Azure Storage Explorer to view contents in Azure Storage Emulator.

Getting Started with Azure Functions

If you are new to Azure Functions, follow Quickstart: Create your first function in Azure using Visual Studio to create your first function. Instead of using HTTP trigger, let’s use Cosmos DB trigger when create the project.

Blob Storage Client Dependency Injection

To manipulate blob storage, we need to inject the blob storage client to the function. Use dependency injection in .NET Azure Functions has a clear explaination on how to do it. The following code shows how to configure the blob container client. BlobContainerClient is from Azure.Storage.Blobs nuget package.

builder.Services.AddSingleton(x =>
{
    var configuration = x.GetService<IConfiguration>();
    var container = new BlobContainerClient(configuration["PaymentBlobStorageConnectionString"], configuration["PaymentBlobStorageContainerName"]);
    container.CreateIfNotExists();
    return container;
});

Function

The following implementation first extracts payment model from the document object and then use the injected blob container client to upload payment data. Check out AzureCosmosDBChangeFeedSample for the complete source code.

public class ChangeFeedSample
{
    private readonly BlobContainerClient _blobContainerClient;

    public ChangeFeedSample(BlobContainerClient blobContainerClient)
    {
        _blobContainerClient = blobContainerClient ?? throw new ArgumentNullException(nameof(blobContainerClient));
    }

    [FunctionName(nameof(ChangeFeedSample))]
    public async Task Run(
        [CosmosDBTrigger(
            "ChangeFeedSample",
            "Payment",
            ConnectionStringSetting = "CosmosDBConnectionString",
            LeaseCollectionName = "leases",
            CreateLeaseCollectionIfNotExists = true)]
        IReadOnlyList<Document> documents,
        ILogger log)
    {
        if (documents != null && documents.Count > 0)
        {
            log.LogInformation("Documents modified: " + documents.Count);
            foreach (var document in documents)
            {
                var jsonSerializerOptions = new JsonSerializerOptions
                {
                    PropertyNamingPolicy = JsonNamingPolicy.CamelCase
                };
                var payment = JsonSerializer.Deserialize<Payment>(document.ToString(), jsonSerializerOptions);
                var paymentString = JsonSerializer.Serialize(payment, jsonSerializerOptions);
                var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(paymentString));
                await _blobContainerClient.UploadBlobAsync(document.Id, memoryStream);
            }
        }
    }
}