In this post, we’ll see how we can use Service Bus triggered Azure Durable Function with D365 CE to perform CRUD operation.
Requirement: When Rate Amount(custom field) field changes in Rate(custom entity) record, update Rate(custom field) field on related(Revenue Schedule Line(custom entity)) records of the corresponding Rate record.
Of course, there are multiple ways to acheive this, however, we’ll achieve this using the below mentioned approach:
- Post-Update plugin to post custom message to Azure ServiceBus Queue – already discussed here: https://ajitpatra.com/2019/12/09/d365-post-custom-message-to-azure-service-bus-queue-c/. We can use this code in post-update plugin of Rate entity.
- ServiceBus triggered Azure Durable Function to read the custom message posted to the queue and update related records back in D365 CE.
After we see the custom message posted in Queue, we’ll proceed with creating ServiceBus triggered Azure Durable Function.
Please see below links for more information on Azure Durable Function and its application patterns.
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp#fan-in-out
We’ll use fan-out/fan-in application pattern in this example, in which we can execute multiple Azure functions in parallel and wait for them to complete.
Below is the code for ServiceBus triggered function which will trigger the orchestration function.
[FunctionName("UpdateGlobalRatesOnRSLs")]
public static async Task Run(
[ServiceBusTrigger("%QueueName%", Connection = "AzureWebJobsServiceBus")] Message message,
MessageReceiver messageReceiver,
string lockToken,
[DurableClient]IDurableOrchestrationClient starter, ILogger log)//QueueName and AzureWebJobsServiceBus are key value pairs on localsettings.json
{
string inputMessage = Encoding.UTF8.GetString(message.Body);
log.LogInformation($"message - " + inputMessage);
if (string.IsNullOrWhiteSpace(inputMessage)) await messageReceiver.DeadLetterAsync(lockToken, "Message content is empty.", "Message content is empty.");
RateObjectConverted rateObjectConverted = DeserializeMessage(inputMessage);
if (rateObjectConverted.RateID == null || rateObjectConverted.RevenueCodeID == null || rateObjectConverted.StartDate == null || rateObjectConverted.RateModel == null)
{
await messageReceiver.DeadLetterAsync(lockToken, "Missing Input.");
log.LogInformation($"Message Dead Lettered: Missing Input.");
return;
}
var orchestrationInput = SerializeAndCompressOrchestrationInput<RateObjectConverted>(rateObjectConverted);
string instanceId = await starter.StartNewAsync<string>("UpdateGlobalRatesOnRSLs_OrchestrationFunction", orchestrationInput);
log.LogInformation($"Orchestration Started with ID: {instanceId}");
var orchestrationStatus = await starter.GetStatusAsync(instanceId);
var status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();
log.LogInformation($"Waiting to complete Orchestration function [Status:{status}][ID:{instanceId}]");
while (status == "PENDING" || status == "RUNNING")
{
await Task.Delay(1000);
orchestrationStatus = await starter.GetStatusAsync(instanceId);
status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();
}
log.LogInformation($"UpdateGlobalRatesOnRSLs Function completed [Instance ID:{instanceId}]");
if (!(bool)orchestrationStatus.Output)
{
await messageReceiver.AbandonAsync(lockToken);
}
}
The function dead letters the message if any required input is missing in the message and starts the orchestration function.
Below is the code for orchestration function which passes “rateObjectConverted” as an input to the activity function.
[FunctionName("UpdateGlobalRatesOnRSLs_OrchestrationFunction")]
public static async Task<bool> RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
var compressedInput = context.GetInput<string>();
var input = StringCompressor.DecompressString(compressedInput);
var rateObjectConverted = JsonConvert.DeserializeObject<RateObjectConverted>(input);
var results = GetRSLs(rateObjectConverted, log).ToList();
if (results.Count <= 0) return true;
var tasks = new Task<bool>[results.Count];
log.LogInformation($"Revenue Schedule Lines Count: {results.Count}");
for (int i = 0; i < results.Count; i++)
{
tasks[i] = context.CallActivityAsync<bool>("UpdateGlobalRatesOnRSLs_ActivityFunction", results[i]);
}
await Task.WhenAll(tasks);
var allSuccessful = tasks.All(t => t.Result);
return allSuccessful;
}
The orchestration function basically gets the related records of Rate record and calls the activity function for each related record found. The function also abandons/completes the message based on the result from the activity function.
The highlighted lines above show the usage of fan-out/fan-in application pattern.
Here’s the code for activity function which updates the each record passed from orchestration function.
[FunctionName("UpdateGlobalRatesOnRSLs_ActivityFunction")]
public static async Task<bool> DoAction([ActivityTrigger] RSLObject entity, ILogger log)
{
var crmApi = await CRMWebApiHelper.GetCRMWebAPI(log);
try
{
var dictionary = new Dictionary<string, object>();
dictionary.Add("dxc_rate", 500);//static value for simplicity
await crmApi.Update("dxc_revenueschedulelines", entity.RSLID, dictionary);
await crmApi.Associate("dxc_revenueschedulelines", entity.RSLID, "dxc_revenuescheduleline_dxc_rate", "dxc_rates", entity.RateObjectConverted.RateID.Value);
log.LogInformation($"RSL ID:{entity.RSLID} processed.");
return true;
}
catch (Exception ex)
{
log.LogInformation($"Error processing RSL ID:{entity.RSLID} : {ex.Message}");
log.LogInformation($"Error StackTrace: {ex.StackTrace}");
return false;
}
}
Below are some helper methods used in the above functions:
public static RateObjectConverted DeserializeMessage(string message)
{
RateObjectConverted rateObjectConverted = new RateObjectConverted();
var rateObject = JsonConvert.DeserializeObject<RateObject>(message);
if (!string.IsNullOrWhiteSpace(rateObject.EndDate)) rateObjectConverted.EndDate = Convert.ToDateTime(rateObject.EndDate);
if (!string.IsNullOrWhiteSpace(rateObject.StartDate)) rateObjectConverted.StartDate = Convert.ToDateTime(rateObject.StartDate);
if (!string.IsNullOrWhiteSpace(rateObject.RateID)) rateObjectConverted.RateID = new Guid(rateObject.RateID);
if (!string.IsNullOrWhiteSpace(rateObject.RevenueCodeID)) rateObjectConverted.RevenueCodeID = new Guid(rateObject.RevenueCodeID);
if (!string.IsNullOrWhiteSpace(rateObject.RateAmount)) rateObjectConverted.RateAmount = Convert.ToDecimal(rateObject.RateAmount);
if (!string.IsNullOrWhiteSpace(rateObject.RatePercentage)) rateObjectConverted.RatePercentage = Convert.ToDecimal(rateObject.RatePercentage);
if (!string.IsNullOrWhiteSpace(rateObject.RateModel)) rateObjectConverted.RateModel = Convert.ToInt32(rateObject.RateModel);
return rateObjectConverted;
}
public static string SerializeAndCompressOrchestrationInput<T>(T input)
{
var inputString = JsonConvert.SerializeObject(input);
var compressedString = StringCompressor.CompressString(inputString);
return compressedString;
}
private static IEnumerable<RSLObject> GetRSLs(CRMWebAPI api, RateObjectConverted rateObjectConverted, ILogger log)
{
var xml = @"<fetch version='1.0' output-format='xml-platform' mapping='logical' distinct='true'>
<entity name='dxc_revenuescheduleline'>
<attribute name='dxc_revenueschedulelineid' />
<attribute name='dxc_name' />
<attribute name='dxc_startdate' />
<filter type='and'>
<condition attribute='dxc_revenuecodeid' operator='eq' uitype='dxc_revenuecode' value='" + rateObjectConverted.RevenueCodeID.ToString() + @"' />
</filter>
</entity>
</fetch>";
log.LogInformation($"Revenue Schedule Line FetchXML: {xml}");
var options = new CRMGetListOptions()
{
FormattedValues = true,
FetchXml = xml
};
var result = api.GetList("dxc_revenueschedulelines", options);
if (result == null || result.Result == null || result.Result.List.Count == 0)
return new List<RSLObject>();
return (from r in result.Result.List.Cast<IDictionary<string, object>>()
select new RSLObject
{
RSLID = Guid.Parse(r["dxc_revenueschedulelineid"].ToString()),
StartDate = r.ContainsKey("dxc_startdate") ? Convert.ToDateTime(r["dxc_startdate"]) : DateTime.UtcNow.Date,
rateObjectConverted = rateObjectConverted
});
}
private static async Task<CRMWebAPI> GetCRMWebAPI()
{
var crmOrganizationUrl = Environment.GetEnvironmentVariable("CRMOrganization");
var crmOrganizationVersionUrl = Environment.GetEnvironmentVariable("CRMOrganizationVersionUrl");
var crmUrl = $"{crmOrganizationUrl}{crmOrganizationVersionUrl}";
var resultAccessToken = await GetAccessToken();
var crmApi = new CRMWebAPI(crmUrl, resultAccessToken);
return crmApi;
}
private static async Task<string> GetAccessToken()
{
var authority = Environment.GetEnvironmentVariable("AuthorityUrl");
var clientId = Environment.GetEnvironmentVariable("ClientId");
var crmBaseUrl = Environment.GetEnvironmentVariable("CRMOrganization");
var clientSecret = Environment.GetEnvironmentVariable("ClientSecret");
var tenantId = Environment.GetEnvironmentVariable("TenantId");
var clientCredential = new ClientCredential(clientId, clientSecret);
var authContext = new AuthenticationContext($"{authority}{tenantId}");
var authResult = await authContext.AcquireTokenAsync(crmBaseUrl, clientCredential);
return authResult.AccessToken;
}
We are getting Environment variables used above from local.settings.json file in the project as shown below:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"CRMOrganization": "https://<crmOrgName>.crm.dynamics.com",
"ClientSecret": "<clientSecret>",
"ClientId": "<clientID>",
"TenantId": "<tenantID>",
"AuthorityUrl": "https://login.microsoftonline.com/",
"CRMOrganizationVersionUrl": "/api/data/v9.1/",
"QueueName": "<QueueName>",
"AzureWebJobsServiceBus": "<PrimaryConnectionStringOfSharedAccessPolicyOfServiceBusNamespace>"
}
}

Use the below namespaces for the code written above:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Xrm.Tools.WebAPI;
using Xrm.Tools.WebAPI.Requests;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus;
using System.Text;
Use the nuget packages shown below:

Now that we have written the code, we can follow the steps discussed here to deploy the functions to Azure: https://ajitpatra.com/2019/10/30/d365-azure-durable-function-with-d365-ce-part-3/.
After deployment, let’s change Rate field on Rate record to “6000” as shown:

The change in Rate field will trigger the post-update plugin that we have written above to post custom message to Azure Service Bus Queue.
When the message arrives in queue, Azure function that we created will be triggered and we can check the logs of Azure function to verify that it has got all the information as input that we had passed to the message as shown:

Hope it helps !!
Do you thinkg there any issues with using a static MessageReceiver?
LikeLike
I have used MessageReceiver in this example. Could you please explain what do you mean by static MessageReceiver?
LikeLike
In your ServiceBus triggered function you are setting the MessageReceiver as a static variable (receiver) so you can also use it in your orchestration function, right? Do you think this might have any memory effects?
LikeLike
Hi Felix, yes, using the variable in orchestration function is the prime reason we have used static variable for. We’ve not evaluated any impact on memory though. However, we have refactored our code a bit and we’re not using those static variables anymore which i’ve updated on this blog.
Thanks,
Ajit
LikeLike