I'm currently investigating Raft in dotNext and would like to move from the fairly simplistic example which registers all the nodes in the cluster at startup to using an announcer to notify the leader when a new node has joined.
To my understanding this means that I should start the initial node in ColdStart
but then subsequent nodes should use the ClusterMemberAnnouncer
to add to the cluster as:
services.AddTransient<ClusterMemberAnnouncer<UriEndPoint>>(serviceProvider => async (memberId, address, cancellationToken) =>
{
// Register the node with the configuration storage
var configurationStorage = serviceProvider.GetService<IClusterConfigurationStorage<UriEndPoint>>();
if (configurationStorage == null)
throw new Exception("Unable to resolve the IClusterConfigurationStorage when adding the new node member");
await configurationStorage.AddMemberAsync(memberId, address, cancellationToken);
});
It makes sense to me that the nodes should use a shared/persisted configuration storage so that when the second node tries to start up and announce itself, it's able to see the first cold-started active node in the cluster. However if I use the documented services.UsePersistentConfigurationStorage("configurationStorage")
approach and then run the nodes in separate console windows ie. separate processes, the second node understandably says:
The process cannot access the file 'C:\Projects\RaftTest\configurationStorage\active.list' because it is being used by another process.
Has anyone perhaps got an example of using an announcer in Raft dotnext? And does anyone know the best way (hopefully with an example) to use persistent cluster configuration storage so that separate processes (potentially running in different docker containers) are able to access the active list?
I wasn't successful in creating my own implementation of the storage since the nodes continually fought over the configuration. So the only way I found it worked with my POC was: Use in memory storage:
services
.UseInMemoryConfigurationStorage(AddClusterMembers)
.ConfigureCluster<ClusterConfigurator>()
.AddSingleton<IHttpMessageHandlerFactory, RaftClientHandlerFactory>()
.AddOptions()
.AddRouting()
.AddControllers();
Where AddClusterMembers
registers the first node with the cluster:
private static void AddClusterMembers(IDictionary<ClusterMemberId, UriEndPoint> members)
{
// Add at least one node to the cluster
var address = new UriEndPoint(new(ClusterConfigurator.OrchestrationNodeAddress, UriKind.Absolute));
members.Add(ClusterMemberId.FromEndPoint(address), address);
}
using the configuration in appsettings "OrchestrationNodeAddress": "https://localhost:3262"
.
I then created a controller with an endpoint to handle the registration of the node:
[Route("api/v1/cluster")]
[Produces("application/json")]
public class ClusterController: ControllerBase
{
private readonly IRaftHttpCluster _cluster;
private readonly IClusterConfigurationStorage<UriEndPoint> _configurationStorage;
private readonly ISupplier<NodeTenantLogEntry> _supplier;
public ClusterController(IRaftHttpCluster cluster, ISupplier<NodeTenantLogEntry> supplier, IClusterConfigurationStorage<UriEndPoint> configurationStorage)
{
_cluster = cluster;
_supplier = supplier;
_configurationStorage = configurationStorage;
}
[HttpPost]
[Route("register-node")]
public async Task<IActionResult> Register([FromBody] RegisterNodeMessage message, CancellationToken token = default)
{
Console.WriteLine($"Registering Node {message.NodeId} with address {message.Address}");
if (!ClusterMemberId.TryParse(message.NodeId, out var clusterMemberId))
{
throw new Exception($"Unable to parse nodeId {message.NodeId}");
}
var address = new Uri(message.Address);
// Add the node to the proposed configuration
await _configurationStorage.AddMemberAsync(clusterMemberId, new UriEndPoint(address), token);
// Add the node to the cluster - this prompts the leader to add the proposed configuration to the active configuration
await _cluster.AddMemberAsync(clusterMemberId, address, token);
return Ok();
}
}
Then in Startup
's ConfigureServices
I registered the ClusterMemberAnnouncer
delegate as:
services.AddTransient<ClusterMemberAnnouncer<UriEndPoint>>(serviceProvider =>
async (memberId, address, cancellationToken) =>
{
var cluster = serviceProvider.GetService<IRaftHttpCluster>();
// Get the first node registered in the cluster. This should pick up the
// node which was manually added in the AddClusterMembers method
var nodeClient = cluster.As<IMessageBus>().Members
.FirstOrDefault();
if (nodeClient != null)
{
Console.WriteLine($"Found node client {nodeClient.EndPoint}");
// Send a post message to the client to register the node (just using a CustomHttpClient
// to ignore any SSL errors as this POC uses a self-signed certificate)
var httpClient = serviceProvider.GetService<CustomHttpClient>();
var endpoint = new Uri(new Uri(nodeClient.EndPoint.ToString()), "api/v1/cluster/register-node");
var request = new RegisterNodeMessage
{ NodeId = memberId.ToString(), Address = address.ToString() };
var response = await httpClient.PostAsync(endpoint, new StringContent(JsonConvert.SerializeObject(request), Encoding.UTF8, "application/json"));
if (!response.IsSuccessStatusCode)
{
throw new Exception($"Unable to register the node with endpoint {endpoint.AbsoluteUri}");
}
}
});
This meant I could then start the first node on port 3262 in cold start mode (which ignores the ClusterMemberAnnouncer
delegate) and start subsequent nodes on other ports not in cold start mode. They then use the ClusterMemberAnnouncer
delegate which finds the manually added 3262 node from the AddClusterMembers
method and send it a HttpPost
message. Node 3262 adds it to its configuration and adds it to the cluster.
It may not be the optimal approach to this problem but I hope it helps someone else.
Ultimately I found Raft.Next to be too complex and unstable for our use case (it seems to randomly throw TaskCanceledException
s and often had issues reading state from the WAL).