I have a service running that is iterating over X actors asking them for their state using the ActorProxy.
Its important to me that this service is not blocked waiting for some other long running method in the actor from ect reminder callbacks.
Is there some way to call the below simple example GetState() that would allow the method to complete right way without blocking incase of some reminder running.
class Actor : IMyActor{
public Task<MyState> GetState() => StateManager.GetAsync("key")
}
alterntive.
what is the proper way form the service to call and if it dont reply within 5 sec, just containue.
var proxy = ActorProxy.Create<IMyActor();
var state = await proxy.GetState(); // this will wait until the actor is ready to send back the state.
It is possible to read the actor state even for Actors that are currently executing a blocking method. Actors store their state using an IActorStateManager
which in turn uses an IActorStateProvider
. The IActorStateProvider
is instantiated once per ActorService
. Each partition instantiates the ActorService
that is responsible for hosting and running actors. The actor service is at the core a StatefulService
(or rather StatefulServiceBase
which is the base class that regular stateful service uses). With this in mind, we can work with the ActorService
that caters to our Actors the same way we would work with a regular service, i.e. with a service interface based on IService
.
The IActorStateProvider
(Implemented by KvsActorStateProvider
if you are using Persisted state) has two methods that we can use:
Task<T> LoadStateAsync<T>(ActorId actorId, string stateName, CancellationToken cancellationToken = null);
Task<PagedResult<ActorId>> GetActorsAsync(int numItemsToReturn, ContinuationToken continuationToken, CancellationToken cancellationToken);
Calls to these methods are not affected by actors' locks, which makes sense since these are designed to support all actors on a partition.
Example:
Create a custom ActorService
and use that one to host your actors:
public interface IManyfoldActorService : IService
{
Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken);
}
public class ManyfoldActorService : ActorService, IManyfoldActorService
{
...
}
Register the new ActorService in Program.Main
:
ActorRuntime.RegisterActorAsync<ManyfoldActor>(
(context, actorType) => new ManyfoldActorService(context, actorType)).GetAwaiter().GetResult();
Assuming we have a simple Actor with the following method:
Task IManyfoldActor.SetCountAsync(int count, CancellationToken cancellationToken)
{
Task.Delay(TimeSpan.FromSeconds(30), cancellationToken).GetAwaiter().GetResult();
var task = this.StateManager.SetStateAsync("count", count, cancellationToken);
ActorEventSource.Current.ActorMessage(this, $"Finished set {count} on {this.Id.GetLongId()}");
return task;
}
It waits for 30 seconds (to simulate long running, blocking, method calls) and then set a state value "count"
to an int
.
In a separate service we can now call the SetCountAsync
for the Actors to generate some state data:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var actorProxyFactory = new ActorProxyFactory();
long iterations = 0;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
iterations += 1;
var actorId = iterations % 10;
var count = Environment.TickCount % 100;
var manyfoldActor = actorProxyFactory.CreateActorProxy<IManyfoldActor>(new ActorId(actorId));
manyfoldActor.SetCountAsync(count, cancellationToken).ConfigureAwait(false);
ServiceEventSource.Current.ServiceMessage(this.Context, $"Set count {count} on {actorId} @ {iterations}");
await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
}
}
This method simply loops endlessly changing the values of the actors. (Note the correlation between total of 10 actors, delay for 3 second and the 30 second delay in actor. Simply designed this way to prevent an infinite buildup of Actor calls waiting for a lock). Each call is also executed as fire-and-forget so we can continue on to updating the state of the next actor before that one returns. Its a silly piece of code, it is just designed this way to prove the theory.
Now in the actor service we can implement the method GetCountsAsync
like this:
public async Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken)
{
ContinuationToken continuationToken = null;
var actors = new Dictionary<long, int>();
do
{
var page = await this.StateProvider.GetActorsAsync(100, continuationToken, cancellationToken);
foreach (var actor in page.Items)
{
var count = await this.StateProvider.LoadStateAsync<int>(actor, "count", cancellationToken);
actors.Add(actor.GetLongId(), count);
}
continuationToken = page.ContinuationToken;
}
while (continuationToken != null);
return actors;
}
This uses the underlying ActorStateProvider
to query for all known Actors (for that partition) and then directly reads the state for each this way 'bypassing' the Actor and not being blocked by the actor's method execution.
Final piece, some method that can call our ActorService and call GetCountsAsync
across all partitions:
public IDictionary<long, int> Get()
{
var applicationName = FabricRuntime.GetActivationContext().ApplicationName;
var actorServiceName = $"{typeof(IManyfoldActorService).Name.Substring(1)}";
var actorServiceUri = new Uri($"{applicationName}/{actorServiceName}");
var fabricClient = new FabricClient();
var partitions = new List<long>();
var servicePartitionList = fabricClient.QueryManager.GetPartitionListAsync(actorServiceUri).GetAwaiter().GetResult();
foreach (var servicePartition in servicePartitionList)
{
var partitionInformation = servicePartition.PartitionInformation as Int64RangePartitionInformation;
partitions.Add(partitionInformation.LowKey);
}
var serviceProxyFactory = new ServiceProxyFactory();
var actors = new Dictionary<long, int>();
foreach (var partition in partitions)
{
var actorService = serviceProxyFactory.CreateServiceProxy<IManyfoldActorService>(actorServiceUri, new ServicePartitionKey(partition));
var counts = actorService.GetCountsAsync(CancellationToken.None).GetAwaiter().GetResult();
foreach (var count in counts)
{
actors.Add(count.Key, count.Value);
}
}
return actors;
}
Running this code will now give us 10 actors that every 33:d second gets it's state updated and where each actor is busy for 30 seconds each time. The Actor service sees the updated state as soon as each actor method returns.
There are some things omitted in this sample, for instance, when you load the state in the actor service we should probably guard against timeouts.