Dataverse + Azure Service Bus Queue + Azure Function for processing long operation

As you all know, in Dataverse, there is a hard 2-minute time limit for an operation to complete that you can read here. And most of the time, we stumble into a scenario where we must bypass this limitation. If your project is on the cloud, we can do it using Azure Service Bus Queue + Azure Function. Although we don’t need Azure Service Bus Queue, we have the benefit to enforce queuing policy (for example you want the FIFO processing async or sync which will be described later).

Create Azure Service Bus Queue

First, I created the below resource (Service Bus):

Create Azure Service Bus

Once the Service Bus is created > go to the resource > Entities Queues > create a new Queue like in the below screenshot:

Create new Queue

As you can see above, I created a new Queue with the name “demo-queue”. All the settings need to be set depending on your scenario. Because the purpose of this blog post is for demo purposes, I set all the properties as default.

Next, we need to go to Settings > Shared access policies > Get the Primary Connection String of the Service Bus:

Copy the Primary Connection String of your Service Bus

Open your Plugin Registration Tool > Register > Register New Service Endpoint:

Set the Queue Name based on the queue you created before and you also can change the “Message Format” based on what you like. For this demo, I set it as “JSON”:

Set the Endpoint

Register Step for the Service Endpoint

Next, we can register a new step to the Service Endpoint that we created earlier. The purpose of it is to make us understand the message that will be sent to the Queue (then we can design the code). So, just right-click on your Service Endpoint > Register New Step > set all the information depending on your scenario:

Register new step

As you can see, in the above screenshot, I set it when there is an update on the Contact when there are changes on the “firstname” or the “lastname” attribute and On Post Operation Async. Here you can change the Filtering attributes (to only run when any of that properties changed). Other than this method, if you are required to run more complex validation logic before calling the Queue, we also can call the Service Endpoint manually using a plugin that you can check here.

From this point, you can test the Queue and peek at the message (on my demo, I just need to update Contact):

Check if the triggering correct

Create Azure Functions

Next, open your Visual Studio > If you don’t have the Azure Functions Template project, you need to follow this link to get your machine ready > Name your project as “DataverseProcessQueue” > Set the Function as “Service Bus Queue trigger”, give Connection String Name as “ServiceBusConnectionString”, and Queue Name as “demo-queue” (as we mentioned before) > Click Create button:


Next, we need to add Microsoft.PowerPlatform.Dataverse.Client, Microsoft.Azure.Functions.Extensions, and Microsoft.Extensions.DependencyInjection into our project:

Full of my .csproj is like in the below:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>net6.0</TargetFramework>
    <AzureFunctionsVersion>v4</AzureFunctionsVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
    <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0" />
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.3.0" />
    <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.1" />
    <PackageReference Include="Microsoft.PowerPlatform.Dataverse.Client" Version="1.1.12" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
</Project>

Here is the host.json file:

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  }
}

And for the local.settings.json, I’m using:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet"
  },
  "ConnectionStrings": {
    "ServiceBusConnectionString": "your-sevicebus-endpoint",
    "DataverseConnectionString": "your-dataverse-connectionstring"
  }
}

Here is the code that I applied for Function1.cs:

using System;
using System.IO;
using System.Text;
using System.Threading;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
[assembly: FunctionsStartup(typeof(DataverseProcessQueue.Startup))]
namespace DataverseProcessQueue
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            var connectionStringSection = builder.GetContext().Configuration.GetSection("ConnectionStrings");
            var dataverseConnectionString = connectionStringSection["DataverseConnectionString"];
            ServiceClient.MaxConnectionTimeout = TimeSpan.FromMinutes(30);
            builder.Services.AddScoped((_) => new ServiceClient(dataverseConnectionString));
        }
    }
    public class ProcessQueueFunction
    {
        private readonly ServiceClient _serviceClient;
        public ProcessQueueFunction(ServiceClient serviceClient)
        {
            _serviceClient = serviceClient;
        }
        [FunctionName("ProcessQueueFunction")]
        public void Run([ServiceBusTrigger("demo-queue", Connection = "ServiceBusConnectionString")] string message, ILogger log)
        {
            var executionContext = DeserializeJsonString<RemoteExecutionContext>(message);
            var entity = (Entity)executionContext.InputParameters["Target"];
            var start = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
            var name =
                $"{entity.GetAttributeValue<string>("firstname")} {entity.GetAttributeValue<string>("lastname")}";
            var modifiedOn = ParseToDateTime(entity.GetAttributeValue<string>("modifiedon"));
            var modifiedString = modifiedOn.ToString("yyyy-MM-dd HH:mm:ss.fff");
            log.LogInformation($"Processing {executionContext.CorrelationId} with name '{name}'.");
            // Heavy operation below 🙂
            Thread.Sleep(TimeSpan.FromMinutes(2));
            var end = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
            var history = $"{executionContext.CorrelationId}: {start} - {end}. {modifiedString}";
            var createHistory = new Entity("tmy_alltypesattribute")
            {
                ["tmy_name"] = name,
                ["tmy_textarea"] = history,
                ["tmy_customer"] = entity.ToEntityReference()
            };
            _serviceClient.Create(createHistory);
        }
        private DateTime ParseToDateTime(string dateString)
        {
            var unixTimestampMilliseconds = long.Parse(dateString.Substring(6, dateString.Length - 8)); // Extract the Unix timestamp in milliseconds
            var dateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(unixTimestampMilliseconds);
            var dateTime = dateTimeOffset.UtcDateTime;
            return dateTime;
        }
        private TRemoteContextType DeserializeJsonString<TRemoteContextType>(string jsonString)
        {
            //create an instance of generic type object
            var obj = Activator.CreateInstance<TRemoteContextType>();
            var ms = new MemoryStream(Encoding.Unicode.GetBytes(jsonString));
            var serializer = new System.Runtime.Serialization.Json.DataContractJsonSerializer(obj.GetType());
            obj = (TRemoteContextType)serializer.ReadObject(ms);
            ms.Close();
            return obj;
        }
    }
}

In the above code, you can see that for setting dependency injection for the ServiceClient (although people may debate whether to always Interface instead of direct implementation class, I have reason to use the implementation class instead). Then on the ProcessQueueFunction class, we can see that the logic is to Deserialize the JSON string to the RemoteExecutionContext class. Then in the main logic, we will sleep the thread to mimic heavy operation (2 minutes), and then create a tmy_alltypesattribute record.

With all the above works, here is the demo that I’m doing:

Updating same data with different Last Name

As you can see in the above screenshot, I’m updating the same data almost at the same time.

Here is the result from the Azure Function Console + Dataverse result:

As you can see in the above screenshot, the processing time for “Raharjo Tab 1” and “Raharjo Tab 2” are almost at the same time (but “Tab 1” is indeed the data that I updated first).

Previously, I thought that how Queue works (by default) is to lock the message until it is finished, only will process the next message. But based on the debugging result, the queue will help with ordering – First in first out (FIFO). But once the message is retrieved by the system, it will process directly (async). This setting can be changed to fit your scenario and please take note that if you set the setting to only process 1 message per run, it will make the system wait until the previous message is finished (sync).

To accomplish the above (FIFO sync), we need to update our host.json:

{
  "version": "2.0",
  "extensions": {
    "serviceBus": {
      "messageHandlerOptions": {
        "maxConcurrentCalls": 1
      }
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  }
}

If somehow your changes are not reflected, please try to Clean + Rebuild again from Visual Studio.

Here is the result:

As you can see in the above result, the yellow one is the first data that is being processed. Then 2 minutes later the red one was executed.

For this blog post, I will stop until here (only testing via debugging). But the above code is ready to be published to the cloud if you want to. 😎

Summary

Again, to learn more about the feature that you can implement using Azure Service Bus Queue, you can read here. If you want to use this, there are several points that we need to plan:

  • Error mechanism: for the past years, I saw lots of wrong implementations where the Developer not applying an error mechanism. In fact, when the error happens in the Azure Function/program that processing the queue, it will retry (by default 10 times). If your program depends on the ordering, it also will be a problem.
  • As you can see in the above scenario, we can implement FIFO sync/async. If you thinking to apply FIFO sync, you need to expect that the processing time will be much slower compare to the default implementation (FIFO async).
  • Using Azure Functions, we can bypass the limitation time of 2 minutes in Dataverse.

Happy CRM-ing!

Author: temmyraharjo

Microsoft Dynamics 365 Technical Consultant, KL Power Platform User Community Leader, Student Forever, Test Driven Development, and Human Code enthusiast.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.