I am trying to queue a series of tasks and run them asynchronously using Azure Service Fabric. I am currently using the CloudMessageQueue with worker roles. I am trying to migrate to Service Fabric. From the worker roles, here is my code:
private void ExecuteTask()
{
CloudQueueMessage message = null;
if (queue == null)
{
jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting.")));
return;
}
try
{
message = queue.GetMessage();
if (message != null)
{
JMATask task = GetTask(message.AsString);
string msg = (message == null) ? string.Empty : message.AsString;
//jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg)));
queue.DeleteMessage(message);
PerformTask(task);
}
}
catch (Exception ex)
{
string msg = (message == null) ? string.Empty : message.AsString;
jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString())));
}
}
I have some questions:
Does the list need to be added to a queue?
namespace Stateful1
{
public class JMATask
{
public string Name { get; set; }
}
/// <summary>
/// An instance of this class is created for each service replica by the Service Fabric runtime.
/// </summary>
internal sealed class Stateful1 : StatefulService
{
public Stateful1(StatefulServiceContext context)
: base(context)
{ }
/// <summary>
/// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests.
/// </summary>
/// <remarks>
/// For more information on service communication, see http://aka.ms/servicefabricservicecommunication
/// </remarks>
/// <returns>A collection of listeners.</returns>
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
return new ServiceReplicaListener[0];
}
/// <summary>
/// This is the main entry point for your service replica.
/// This method executes when this replica of your service becomes primary and has write status.
/// </summary>
/// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param>
protected override async Task RunAsync(CancellationToken cancellationToken)
{
// TODO: Replace the following sample code with your own logic
// or remove this RunAsync override if it's not needed in your service.
IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
//var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
using (var tx = this.StateManager.CreateTransaction())
{
var result = await tasks.TryDequeueAsync(tx);
//how do I execute this method async?
PerformTask(result.Value);
//Create list of JMA Tasks to queue up
await tasks.EnqueueAsync(tx, new JMATask());
//var result = await myDictionary.TryGetValueAsync(tx, "Counter");
//ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}",
// result.HasValue ? result.Value.ToString() : "Value does not exist.");
//await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value);
// If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are
// discarded, and nothing is saved to the secondary replicas.
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
private async void PerformTask(JMATask task)
{
//execute task
}
}
The RunAsync method should not have this line code: await tasks.EnqueueAsync(tx, new JMATask());
Create list of JMA Tasks to queue up should be another method in your stateful service which looks like this:
public async Task AddJMATaskAsync(JMATask task)
{
var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
using (var tx = StateManager.CreateTransaction())
{
try
{
await tasksQueue.EnqueueAsync(tx, request);
await tx.CommitAsync();
}
catch (Exception ex)
{
tx.Abort();
}
}
}
And then your PerformTask method can contain a call to a stateless microservice:
public async Task PerformTask (JMATask task)
{
//1. resolve stateless microservice URI
// statelessSvc
//2. call method of the stateless microservice
// statelessSvc.PerformTask(task);
}
So basically, the stateful service will only queue and dequeue the tasks. Performing the actual task can be done by a microservice which will be available to all nodes in the cluster.