Subscribe to Azure Service Bus Queues using .NET 6 Windows Service

Prabhas Inamdar
6 min readJul 18, 2023

--

Overview

This article is intended to provide details on the integration between different applications using Azure Service Bus Queue. Before getting into the technical details, let’s explore the key use cases.

Use case 1: Integration with applications that need to communicate with systems outside their corporate network. On-premise data centers often have firewall restrictions, especially on inbound connectivity.

Use case 2: Decoupling different components of applications by using a message-based pattern to enable scalability, reliability, and asynchronous processing.

In reference to use case 1, let’s consider two partner applications hosted in their respective on-premise environments that want to exchange data. In the following architecture, the two partner applications will utilize Azure Service Bus Queue for data exchange.

High-Level architecture

  1. Publisher: API writes the JSON message to the sender queue.
  2. Subscriber: The Windows service deployed in On-Premise 2 registers a session handler for listening to messages written to the sender queue. It receives the message and processes it without having to poll the queue.
  3. Publisher: The Windows service deployed in On-Premise 2 writes back the response to the receiver queue.
  4. Subscribe: The Windows service deployed in On-Premise 1 registers a session handler for listening to messages written to the receiver queue. It processes the response received in the receiver queue.

Benefits of this architecture

  • Real-time message synchronization
  • Event-driven architecture pattern
  • Transaction-based processing
  • Proactive monitoring and telemetry: Create alerts using Azure Monitor if messages are not processed for a threshold duration, then send alerts.
  • Adapt batch mode and parallelism for sending large messages.
  • Asynchronous processing for better performance.
  • Azure Service Bus supports fully scaling.

Publisher / Subscriber with Azure Service Bus Queue

This event-driven solution adopts a distributed systems architecture.

When building the solution, it is useful to design message exchanges in terms of publishers and subscribers (or producers and consumers).

Publishers are clients (which can be a traditional client) that write event messages to a known service bus queue endpoint.

Subscribers can be implemented as continuously running background services that are interested in these messages. They subscribe to these messages and process them. When the consuming service is running, it will read messages from the queue and dispatch work items accordingly.

Publisher capabilities

The Publisher can be a library, API, or function with the following capabilities. The WriteMessageToServiceBusEntity method performs the following key functions:

  • Writes a message to the Azure Service Bus queue.
  • The entire operation of sending messages to the queue is within a TransactionScope. This means that either all the messages will be written to the queue successfully, or the operation will fail entirely.
  • The service bus client object is created using the endpoint URL. The client object is used to create a ServiceBusSender object, which is responsible for writing messages to the queue.
  • When submitting messages into the queue, a session is created by setting the SessionId property to a unique transaction ID. For example: DateTime.Now.ToString(“yyyyMMddHHmmssfff”) + UserId, which ensures uniqueness within the session.
  • The Azure Service Bus queue message has a maximum size of 256 KB. Therefore, large brokered messages greater than 255 KB need to be split into a sequence of smaller sub-messages that can be transmitted over the service bus messaging entities. All the split messages will have the same SessionID.
public async Task<int> WriteMessageToServiceBusEntity<T>(T serviceBusMessageDto, CancellationToken stoppingToken)
{
int numberOfSubMessages = 0;
int subMessageNumber = 1;


var serviceBusMessage = (ServiceBusMessageDto)((dynamic)serviceBusMessageDto);
var msgBody = serviceBusMessage.Message;
var sessionID = serviceBusMessage.SessionID;

// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(serviceBusMessage.SeriveBusEndpointUrl))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(serviceBusMessage.ServiceBusEntityName);
Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();


var messageSize = Encoding.UTF8.GetByteCount(msgBody);
numberOfSubMessages = (messageSize / SubMessageBodySize);

if (messageSize % SubMessageBodySize != 0)
{
numberOfSubMessages++;
}

Stream bodyStream = new MemoryStream(Encoding.UTF8.GetBytes(msgBody));
for (int streamOffest = 0; streamOffest < messageSize; streamOffest += SubMessageBodySize)
{
long arraySize = (messageSize - streamOffest) > SubMessageBodySize
? SubMessageBodySize : messageSize - streamOffest;

// Create a stream for the sub-message body.
byte[] subMessageBytes = new byte[arraySize];
int result = bodyStream.Read(subMessageBytes, 0, (int)arraySize);

_logger.LogInformation("Number of bytes read: " + result.ToString());

MemoryStream subMessageStream = new MemoryStream(subMessageBytes);
var subMessageByte = subMessageStream.ToArray();

var subMessageBody = new ServiceBusMessage(subMessageByte);

subMessageBody.SessionId = serviceBusMessage.SessionID;
subMessageBody.MessageId = Guid.NewGuid().ToString();

subMessageBody.ApplicationProperties.Add("MESSAGETRANSACTIONID", serviceBusMessage.MessageTransactionID);
subMessageBody.ApplicationProperties.Add("CONSUMERNAME", serviceBusMessage.ConsumerName);
subMessageBody.ApplicationProperties.Add("NUMBEROFSUBMESSAGES", numberOfSubMessages);
subMessageBody.ApplicationProperties.Add("SUBMESSAGENUMBER", subMessageNumber);
subMessageBody.Subject = serviceBusMessage.MessageLabel;
if (subMessageNumber == numberOfSubMessages)
{
subMessageBody.Subject = serviceBusMessage.MessageLabel + "-Final";
}

messages.Enqueue(subMessageBody);
subMessageNumber++;
}
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
// total number of messages to be sent to the Service Bus queue
int messageCount = messages.Count;

// while all messages are not sent to the Service Bus queue
while (messages.Count > 0)
{
// start a new batch
using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

// add the first message to the batch
if (messageBatch.TryAddMessage(messages.Peek()))
{
// dequeue the message from the .NET queue once the message is added to the batch
messages.Dequeue();
}
else
{
// if the first message can't fit, then it is too large for the batch
var errorMessage = $"Message for SessionID {sessionID} and {messageCount - messages.Count} is too large and cannot be sent.";
_logger.LogCritical(errorMessage);
throw new ArgumentException(errorMessage);
}

// add as many messages as possible to the current batch
while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
{
// dequeue the message from the .NET queue as it has been added to the batch
messages.Dequeue();
}

// now, send the batch
await sender.SendMessagesAsync(messageBatch);
// if there are any remaining messages in the .NET queue, the while loop repeats
}
scope.Complete();
}
}


return numberOfSubMessages;
}

Subscriber capabilities

The Subscriber is the continuously running background service that can be deployed on-premise. It receives messages from the queue and processes them without the need to continuously poll the queue.

In the project, the DoWork method creates multiple session queue clients based on the queue configuration. Each queue client will register a message handler, which will be executed when a message arrives in the queue.

        private async Task DoWork(CancellationToken stoppingToken)
{
_logger.LogInformation(
"Consume Scoped Service Hosted Service is working.");

using (var scope = Services.CreateScope())
{
var scopedServiceBusService =
scope.ServiceProvider
.GetRequiredService<IServiceBusMessage<ServiceBusClientConfigDto>>();

var queueConfigurationList = await GetServiceBusQueueConfiguration();

if (queueConfigurationList != null && queueConfigurationList.Count > 0)
{
Parallel.ForEach(queueConfigurationList, queueData => scopedServiceBusService.CreateSessionQueueClient<ServiceBusClientConfigDto>(new List<ServiceBusClientConfigDto> { queueData }, stoppingToken));
_logger.LogInformation("CreateServiceBusQueueClient Method: Initialization Completed.");
}
}
}

The key configuration parameters to note are:
1. ServiceBusSessionProcessorOptions.ReceiveMode is set to PeekLock. The processor will complete the message after executing the message handler.

2. MaxConcurrentSessions: This parameter is very helpful for better performance as it allows processing messages concurrently in multiple sessions.

3. MaxConcurrentCallsPerSession: This parameter sets the maximum number of calls to the callback the processor will initiate per session.

public async Task CreateSessionQueueClient<T>(IEnumerable<T> queueConfiguration, CancellationToken stoppingToken)
{
ServiceBusSessionProcessor processor = null;
try
{
var queueConfigModel = (ServiceBusClientConfigDto)((dynamic)queueConfiguration.FirstOrDefault());

var receiverQueueEndPointUrl = queueConfigModel.ReceiverQueueEndPointUrl;
var receiverQueueName = queueConfigModel.ReceiverQueueName;
var noOfConcurrentSessions = queueConfigModel.ReceiverMaxConcurrentSession;
var prefetchCount = queueConfigModel.ReceiverPreFetchCount;

_logger.LogInformation("QueueRegistration: " + receiverQueueEndPointUrl + "-" + receiverQueueName);

var options = new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpTcp,
RetryOptions = new ServiceBusRetryOptions()
{
Mode = ServiceBusRetryMode.Exponential,
Delay = TimeSpan.FromSeconds(30),
MaxRetries = 20,
MaxDelay = TimeSpan.FromMinutes(5),
TryTimeout = TimeSpan.FromSeconds(60)
}
};

await using (ServiceBusClient client = new ServiceBusClient(receiverQueueEndPointUrl, options))
{
var receiverOptions = new ServiceBusSessionProcessorOptions
{
// By default after the message handler returns, the processor will complete the message
// If I want more fine-grained control over settlement, I can set this to false.
AutoCompleteMessages = false,

// I can also allow for processing multiple sessions
MaxConcurrentSessions = noOfConcurrentSessions,

PrefetchCount = prefetchCount,

MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(60),

// By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
// Set AutoCompleteMessages to false to [settle messages](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
// In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
MaxConcurrentCallsPerSession = 1,

ReceiveMode = ServiceBusReceiveMode.PeekLock

// Processing can be optionally limited to a subset of session Ids.
//SessionIds = { "my-session", "your-session" },
};
// create a processor that we can use to process the messages
processor = client.CreateSessionProcessor(receiverQueueName, receiverOptions);

// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;

// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;

// start processing
await processor.StartProcessingAsync();
_logger.LogInformation("Start Receiving messages");

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
// stop processing
_logger.LogInformation("Stopped Receiving messages");

await processor.StopProcessingAsync();

_logger.LogInformation("Stopped Receiving messages");
}
}
catch (Exception ex)
{
_logger.LogInformation($"Exception: {ex.Message}");
await Task.Delay(10000, stoppingToken);
}
finally
{
// add handler to process messages
processor.ProcessMessageAsync -= MessageHandler;

// add handler to process any errors
processor.ProcessErrorAsync -= ErrorHandler;
}
}
// handle received messages
async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
string body = args.Message.Body.ToString();
_logger.LogInformation($"Received: {body}");

var IsOperationSuccess = await ProcessMessage(args.Message, args.CancellationToken);
if (IsOperationSuccess) //delete only when msg saved in table
{
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
}

// handle any errors when receiving messages
Task ErrorHandler(ProcessErrorEventArgs args)
{
_logger.LogInformation(args.Exception.ToString());
return Task.CompletedTask;
}

Source Code

Refer to the complete source code

https://github.com/prabhasinamdar/az-asb-worker-service

--

--

Prabhas Inamdar

Solution Architect driving app modernization & digital transformation. Azure Cloud specialist. Passionate about innovation.