Using Service Bus Triggered Azure Durable Function with D365 CE

In this post, we’ll see how we can use Service Bus triggered Azure Durable Function with D365 CE to perform CRUD operation.

Requirement: When Rate Amount(custom field) field changes in Rate(custom entity) record, update Rate(custom field) field on related(Revenue Schedule Line(custom entity)) records of the corresponding Rate record.

Of course, there are multiple ways to acheive this, however, we’ll achieve this using the below mentioned approach:

After we see the custom message posted in Queue, we’ll proceed with creating ServiceBus triggered Azure Durable Function.
Please see below links for more information on Azure Durable Function and its application patterns.
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp#fan-in-out
We’ll use fan-out/fan-in application pattern in this example, in which we can execute multiple Azure functions in parallel and wait for them to complete.

Below is the code for ServiceBus triggered function which will trigger the orchestration function.

[FunctionName("UpdateGlobalRatesOnRSLs")]
 public static async Task Run(
            [ServiceBusTrigger("%QueueName%", Connection = "AzureWebJobsServiceBus")] Message message,
            MessageReceiver messageReceiver,
            string lockToken,
            [DurableClient]IDurableOrchestrationClient starter, ILogger log)//QueueName and AzureWebJobsServiceBus are key value pairs on localsettings.json 
{
	string inputMessage = Encoding.UTF8.GetString(message.Body);
	log.LogInformation($"message - " + inputMessage);	
	if (string.IsNullOrWhiteSpace(inputMessage)) await messageReceiver.DeadLetterAsync(lockToken, "Message content is empty.", "Message content is empty.");

	RateObjectConverted rateObjectConverted = DeserializeMessage(inputMessage);

	 if (rateObjectConverted.RateID == null || rateObjectConverted.RevenueCodeID == null || rateObjectConverted.StartDate == null || rateObjectConverted.RateModel == null)
            {
                await messageReceiver.DeadLetterAsync(lockToken, "Missing Input.");
                log.LogInformation($"Message Dead Lettered: Missing Input.");
                return;
            }
	
	  var orchestrationInput = SerializeAndCompressOrchestrationInput<RateObjectConverted>(rateObjectConverted);
            string instanceId = await starter.StartNewAsync<string>("UpdateGlobalRatesOnRSLs_OrchestrationFunction", orchestrationInput);
            log.LogInformation($"Orchestration Started with ID: {instanceId}");

 var orchestrationStatus = await starter.GetStatusAsync(instanceId);
            var status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();
            log.LogInformation($"Waiting to complete Orchestration function [Status:{status}][ID:{instanceId}]");

            while (status == "PENDING" || status == "RUNNING")
            {
                await Task.Delay(1000);
                orchestrationStatus = await starter.GetStatusAsync(instanceId);
                status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();
            }

            log.LogInformation($"UpdateGlobalRatesOnRSLs Function completed [Instance ID:{instanceId}]");

            if (!(bool)orchestrationStatus.Output)
            {
                await messageReceiver.AbandonAsync(lockToken);
            }
}

The function dead letters the message if any required input is missing in the message and starts the orchestration function.

Below is the code for orchestration function which passes “rateObjectConverted” as an input to the activity function.

[FunctionName("UpdateGlobalRatesOnRSLs_OrchestrationFunction")]
        public static async Task<bool> RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
        {
            var compressedInput = context.GetInput<string>();
            var input = StringCompressor.DecompressString(compressedInput);
            var rateObjectConverted = JsonConvert.DeserializeObject<RateObjectConverted>(input);
            var results = GetRSLs(rateObjectConverted, log).ToList();

            if (results.Count <= 0) return true;

            var tasks = new Task<bool>[results.Count];
            log.LogInformation($"Revenue Schedule Lines Count: {results.Count}");

            for (int i = 0; i < results.Count; i++)
            {
                tasks[i] = context.CallActivityAsync<bool>("UpdateGlobalRatesOnRSLs_ActivityFunction", results[i]);
            }

            await Task.WhenAll(tasks);
            var allSuccessful = tasks.All(t => t.Result);
            return allSuccessful;
        }

The orchestration function basically gets the related records of Rate record and calls the activity function for each related record found. The function also abandons/completes the message based on the result from the activity function.
The highlighted lines above show the usage of fan-out/fan-in application pattern.

Here’s the code for activity function which updates the each record passed from orchestration function.

[FunctionName("UpdateGlobalRatesOnRSLs_ActivityFunction")]
public static async Task<bool> DoAction([ActivityTrigger] RSLObject entity, ILogger log)
{
	 var crmApi = await CRMWebApiHelper.GetCRMWebAPI(log);
            try
            {
                var dictionary = new Dictionary<string, object>();
                dictionary.Add("dxc_rate", 500);//static value for simplicity
                await crmApi.Update("dxc_revenueschedulelines", entity.RSLID, dictionary);
                await crmApi.Associate("dxc_revenueschedulelines", entity.RSLID, "dxc_revenuescheduleline_dxc_rate", "dxc_rates", entity.RateObjectConverted.RateID.Value);
                log.LogInformation($"RSL ID:{entity.RSLID} processed.");
                return true;
            }
	  catch (Exception ex)
            {
                log.LogInformation($"Error processing RSL ID:{entity.RSLID} : {ex.Message}");
                log.LogInformation($"Error StackTrace: {ex.StackTrace}");
                return false;
            }
}

Below are some helper methods used in the above functions:

public static RateObjectConverted DeserializeMessage(string message)
{
	RateObjectConverted rateObjectConverted = new RateObjectConverted();

	var rateObject = JsonConvert.DeserializeObject<RateObject>(message);

	if (!string.IsNullOrWhiteSpace(rateObject.EndDate)) rateObjectConverted.EndDate = Convert.ToDateTime(rateObject.EndDate);

	if (!string.IsNullOrWhiteSpace(rateObject.StartDate)) rateObjectConverted.StartDate = Convert.ToDateTime(rateObject.StartDate);

	if (!string.IsNullOrWhiteSpace(rateObject.RateID)) rateObjectConverted.RateID = new Guid(rateObject.RateID);

	if (!string.IsNullOrWhiteSpace(rateObject.RevenueCodeID)) rateObjectConverted.RevenueCodeID = new Guid(rateObject.RevenueCodeID);

	if (!string.IsNullOrWhiteSpace(rateObject.RateAmount)) rateObjectConverted.RateAmount = Convert.ToDecimal(rateObject.RateAmount);

	if (!string.IsNullOrWhiteSpace(rateObject.RatePercentage)) rateObjectConverted.RatePercentage = Convert.ToDecimal(rateObject.RatePercentage);

	if (!string.IsNullOrWhiteSpace(rateObject.RateModel)) rateObjectConverted.RateModel = Convert.ToInt32(rateObject.RateModel);

	return rateObjectConverted;
}

   public static string SerializeAndCompressOrchestrationInput<T>(T input)
        {
            var inputString = JsonConvert.SerializeObject(input);
            var compressedString = StringCompressor.CompressString(inputString);
            return compressedString;
        }

private static IEnumerable<RSLObject> GetRSLs(CRMWebAPI api, RateObjectConverted rateObjectConverted, ILogger log)
{
	var xml = @"<fetch version='1.0' output-format='xml-platform' mapping='logical' distinct='true'>
				<entity name='dxc_revenuescheduleline'>
				<attribute name='dxc_revenueschedulelineid' />
				<attribute name='dxc_name' />    
				<attribute name='dxc_startdate' />
					<filter type='and'>
						<condition attribute='dxc_revenuecodeid' operator='eq' uitype='dxc_revenuecode' value='" + rateObjectConverted.RevenueCodeID.ToString() + @"' />
					</filter>
				</entity>
			</fetch>";

	log.LogInformation($"Revenue Schedule Line FetchXML: {xml}");

	var options = new CRMGetListOptions()
	{
		FormattedValues = true,
		FetchXml = xml
	};
	var result = api.GetList("dxc_revenueschedulelines", options);

	if (result == null || result.Result == null || result.Result.List.Count == 0)
		return new List<RSLObject>();

	return (from r in result.Result.List.Cast<IDictionary<string, object>>()
			select new RSLObject
			{
				RSLID = Guid.Parse(r["dxc_revenueschedulelineid"].ToString()),
				StartDate = r.ContainsKey("dxc_startdate") ? Convert.ToDateTime(r["dxc_startdate"]) : DateTime.UtcNow.Date,
				rateObjectConverted = rateObjectConverted
			});
}

private static async Task<CRMWebAPI> GetCRMWebAPI()
{
	var crmOrganizationUrl = Environment.GetEnvironmentVariable("CRMOrganization");
	var crmOrganizationVersionUrl = Environment.GetEnvironmentVariable("CRMOrganizationVersionUrl");
	var crmUrl = $"{crmOrganizationUrl}{crmOrganizationVersionUrl}";
	var resultAccessToken = await GetAccessToken();
	var crmApi = new CRMWebAPI(crmUrl, resultAccessToken);
	return crmApi;
}

 private static async Task<string> GetAccessToken()
        {
            var authority = Environment.GetEnvironmentVariable("AuthorityUrl");
            var clientId = Environment.GetEnvironmentVariable("ClientId");
            var crmBaseUrl = Environment.GetEnvironmentVariable("CRMOrganization");
            var clientSecret = Environment.GetEnvironmentVariable("ClientSecret");
            var tenantId = Environment.GetEnvironmentVariable("TenantId");
            var clientCredential = new ClientCredential(clientId, clientSecret);
            var authContext = new AuthenticationContext($"{authority}{tenantId}");
            var authResult = await authContext.AcquireTokenAsync(crmBaseUrl, clientCredential);
            return authResult.AccessToken;
        }

We are getting Environment variables used above from local.settings.json file in the project as shown below:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "CRMOrganization": "https://<crmOrgName>.crm.dynamics.com",
    "ClientSecret": "<clientSecret>",
    "ClientId": "<clientID>",
    "TenantId": "<tenantID>",
    "AuthorityUrl": "https://login.microsoftonline.com/",
    "CRMOrganizationVersionUrl": "/api/data/v9.1/",
	"QueueName": "<QueueName>",
    "AzureWebJobsServiceBus": "<PrimaryConnectionStringOfSharedAccessPolicyOfServiceBusNamespace>"
  }
}

Use the below namespaces for the code written above:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Xrm.Tools.WebAPI;
using Xrm.Tools.WebAPI.Requests;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus;
using System.Text;

Use the nuget packages shown below:

Now that we have written the code, we can follow the steps discussed here to deploy the functions to Azure: https://ajitpatra.com/2019/10/30/d365-azure-durable-function-with-d365-ce-part-3/.

After deployment, let’s change Rate field on Rate record to “6000” as shown:

The change in Rate field will trigger the post-update plugin that we have written above to post custom message to Azure Service Bus Queue.

When the message arrives in queue, Azure function that we created will be triggered and we can check the logs of Azure function to verify that it has got all the information as input that we had passed to the message as shown:

Hope it helps !!

6 thoughts on “Using Service Bus Triggered Azure Durable Function with D365 CE

  1. In your ServiceBus triggered function you are setting the MessageReceiver as a static variable (receiver) so you can also use it in your orchestration function, right? Do you think this might have any memory effects?

    Like

    1. Hi Felix, yes, using the variable in orchestration function is the prime reason we have used static variable for. We’ve not evaluated any impact on memory though. However, we have refactored our code a bit and we’re not using those static variables anymore which i’ve updated on this blog.

      Thanks,
      Ajit

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.