I have a system with 10 machines where I need to perform a certain task on each machine one by one in synchronize order. Basically only one machine should do that task at a particular time. We already use Consul
for some other purpose but I was thinking can we use Consul
to do this as well?
I read more about it and it looks like we can use leader election with consul where each machine will try to acquire lock, do the work and then release the lock. Once work is done, it will release the lock and then other machine will try to acquire lock again and do the same work. This way everything will be synchronized one machine at a time.
I decided to use this C#
PlayFab ConsulDotNet
library which already has this capability built in looks like but if there is any better option available I am open to that as well. Below Action
method in my code base is called on each machine at the same time almost through a watcher mechanism.
private void Action() {
// Try to acquire lock using Consul.
// If lock acquired then DoTheWork() otherwise keep waiting for it until lock is acquired.
// Once work is done, release the lock
// so that some other machine can acquire the lock and do the same work.
}
Now inside that above method I need to do below things -
Idea is all 10 machines should DoTheWork()
one at a time in synchronize order. Based on this blog and this blog I decided to modify their example to fit our needs -
Below is my LeaderElectionService
class:
public class LeaderElectionService
{
public LeaderElectionService(string leadershipLockKey)
{
this.key = leadershipLockKey;
}
public event EventHandler<LeaderChangedEventArgs> LeaderChanged;
string key;
CancellationTokenSource cts = new CancellationTokenSource();
Timer timer;
bool lastIsHeld = false;
IDistributedLock distributedLock;
public void Start()
{
timer = new Timer(async (object state) => await TryAcquireLock((CancellationToken)state), cts.Token, 0, Timeout.Infinite);
}
private async Task TryAcquireLock(CancellationToken token)
{
if (token.IsCancellationRequested)
return;
try
{
if (distributedLock == null)
{
var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.host.domain.com") };
ConsulClient client = new ConsulClient(clientConfig);
distributedLock = await client.AcquireLock(new LockOptions(key) { LockTryOnce = true, LockWaitTime = TimeSpan.FromSeconds(3) }, token).ConfigureAwait(false);
}
else
{
if (!distributedLock.IsHeld)
{
await distributedLock.Acquire(token).ConfigureAwait(false);
}
}
}
catch (LockMaxAttemptsReachedException ex)
{
//this is expected if it couldn't acquire the lock within the first attempt.
Console.WriteLine(ex.Stacktrace);
}
catch (Exception ex)
{
Console.WriteLine(ex.Stacktrace);
}
finally
{
bool lockHeld = distributedLock?.IsHeld == true;
HandleLockStatusChange(lockHeld);
//Retrigger the timer after a 10 seconds delay (in this example). Delay for 7s if not held as the AcquireLock call will block for ~3s in every failed attempt.
timer.Change(lockHeld ? 10000 : 7000, Timeout.Infinite);
}
}
protected virtual void HandleLockStatusChange(bool isHeldNew)
{
// Is this the right way to check and do the work here?
// In general I want to call method "DoTheWork" in "Action" method itself
// And then release and destroy the session once work is done.
if (isHeldNew)
{
// DoTheWork();
Console.WriteLine("Hello");
// And then were should I release the lock so that other machine can try to grab it?
// distributedLock.Release();
// distributedLock.Destroy();
}
if (lastIsHeld == isHeldNew)
return;
else
{
lastIsHeld = isHeldNew;
}
if (LeaderChanged != null)
{
LeaderChangedEventArgs args = new LeaderChangedEventArgs(lastIsHeld);
foreach (EventHandler<LeaderChangedEventArgs> handler in LeaderChanged.GetInvocationList())
{
try
{
handler(this, args);
}
catch (Exception ex)
{
Console.WriteLine(ex.Stacktrace);
}
}
}
}
}
And below is my LeaderChangedEventArgs
class:
public class LeaderChangedEventArgs : EventArgs
{
private bool isLeader;
public LeaderChangedEventArgs(bool isHeld)
{
isLeader = isHeld;
}
public bool IsLeader { get { return isLeader; } }
}
In the above code there are lot of pieces which might not be needed for my use case but idea is same.
Problem Statement
Now in my Action
method I would like to use above class and perform the task as soon as lock is acquired otherwise keep waiting for the lock. Once work is done, release and destroy the session so that other machine can grab it and do the work. I am kinda confuse on how to use above class properly in my below method.
private void Action() {
LeaderElectionService electionService = new LeaderElectionService("data/process");
// electionService.LeaderChanged += (source, arguments) => Console.WriteLine(arguments.IsLeader ? "Leader" : "Slave");
electionService.Start();
// now how do I wait for the lock to be acquired here indefinitely
// And once lock is acquired, do the work and then release and destroy the session
// so that other machine can grab the lock and do the work
}
I recently started working with C#
so that's why kinda confuse on how to make this work efficiently in production by using Consul
and this library.
Update
I tried with below code as per your suggestion and I think I tried this earlier as well but for some reason as soon as it goes to this line await distributedLock.Acquire(cancellationToken);
, it just comes back to main method automatically. It never moves forward to my Doing Some Work!
print out. Does CreateLock
actually works? I am expecting that it will create data/lock
on consul (since it is not there) and then try to acquire the lock on it and if acquired, then do the work and then release it for other machines?
private static CancellationTokenSource cts = new CancellationTokenSource();
public static void Main(string[] args)
{
Action(cts.Token);
Console.WriteLine("Hello World");
}
private static async Task Action(CancellationToken cancellationToken)
{
const string keyName = "data/lock";
var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.test.host.com") };
ConsulClient client = new ConsulClient(clientConfig);
var distributedLock = client.CreateLock(keyName);
while (true)
{
try
{
// Try to acquire lock
// As soon as it comes to this line,
// it just goes back to main method automatically. not sure why
await distributedLock.Acquire(cancellationToken);
// Lock is acquired
// DoTheWork();
Console.WriteLine("Doing Some Work!");
// Work is done. Jump out of loop to release the lock
break;
}
catch (LockHeldException)
{
// Cannot acquire the lock. Wait a while then retry
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
}
catch (Exception)
{
// TODO: Handle exception thrown by DoTheWork method
// Here we jump out of the loop to release the lock
// But you can try to acquire the lock again based on your requirements
break;
}
}
// Release and destroy the lock
// So that other machine can grab the lock and do the work
await distributedLock.Release(cancellationToken);
await distributedLock.Destroy(cancellationToken);
}
IMO, LeaderElectionService
from those blogs is an overkill in your case.
Update 1
There is no need to do while
loop because:
ConsulClient
is local variable
IsHeld
propertyAcquire
will block indefinitely unless
LockTryOnce
true in LockOptions
CancellationToken
Side note, it is not necessary to invoke Destroy
method after you call Release
on the distributed lock (reference).
private async Task Action(CancellationToken cancellationToken)
{
const string keyName = "YOUR_KEY";
var client = new ConsulClient();
var distributedLock = client.CreateLock(keyName);
try
{
// Try to acquire lock
// NOTE:
// Acquire method will block indefinitely unless
// 1. Set LockTryOnce = true in LockOptions
// 2. Pass a timeout to cancellation token
await distributedLock.Acquire(cancellationToken);
// Lock is acquired
DoTheWork();
}
catch (Exception)
{
// TODO: Handle exception thrown by DoTheWork method
}
// Release the lock (not necessary to invoke Destroy method),
// so that other machine can grab the lock and do the work
await distributedLock.Release(cancellationToken);
}
Update 2
The reason why OP's code just returns back to Main
method is that, Action
method is not awaited. You can use async Main if you use C# 7.1, and put await
on Action
method.
public static async Task Main(string[] args)
{
await Action(cts.Token);
Console.WriteLine("Hello World");
}