I want to implement a custom activity which behaves like Signal Received
– means it “suspends” the workflow and when I called it from a client-side (Postman) to resume and continue.
I don’t want to use the existing Signal Received
since I want to have some additional logic before suspending the workflow in the same action.
I followed the guidelines from here.
So I implemented Signal Custom
activity based on SignalReceived default activity.
using System;
using Elsa.Activities.Signaling.Models;
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;
namespace Elsa.Workflows.CustomActivities.Signals
{
[Trigger(
Category = "Custom",
DisplayName = "Signal Custom",
Description = "Custom - Suspend workflow execution until the specified signal is received.",
Outcomes = new[] { OutcomeNames.Done }
)]
public class SignalCustom : Activity
{
[ActivityInput(Hint = "The name of the signal to wait for.", SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
public string Signal { get; set; } = default!;
[ActivityOutput(Hint = "The input that was received with the signal.")]
public object SignalInput { get; set; }
[ActivityOutput] public object Output { get; set; }
protected override bool OnCanExecute(ActivityExecutionContext context)
{
if (context.Input is Signal triggeredSignal)
return string.Equals(triggeredSignal.SignalName, Signal, StringComparison.OrdinalIgnoreCase);
return false;
}
protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context) => context.WorkflowExecutionContext.IsFirstPass ? OnResume(context) : Suspend();
protected override IActivityExecutionResult OnResume(ActivityExecutionContext context)
{
var triggeredSignal = context.GetInput<Signal>()!;
SignalInput = triggeredSignal.Input;
Output = triggeredSignal.Input;
context.LogOutputProperty(this, nameof(Output), Output);
return Done();
}
}
}
I created theSignalCustomBookmark.cs
:
using Elsa.Services;
namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
public class SignalCustomBookmark : IBookmark
{
public string Signal { get; set; } = default!;
}
}
and SignalCustomBookmarkProvider.cs
as well:
using Elsa.Services;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
public class SignalCustomBookmarkProvider : BookmarkProvider<SignalCustomBookmark, SignalCustom>
{
public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<SignalCustom> context, CancellationToken cancellationToken) => await GetBookmarksInternalAsync(context, cancellationToken).ToListAsync(cancellationToken);
private async IAsyncEnumerable<BookmarkResult> GetBookmarksInternalAsync(BookmarkProviderContext<SignalCustom> context, [EnumeratorCancellation] CancellationToken cancellationToken)
{
var signalName = (await context.ReadActivityPropertyAsync(x => x.Signal, cancellationToken))?.ToLowerInvariant().Trim();
// Can't do anything with an empty signal name.
if (string.IsNullOrEmpty(signalName))
yield break;
yield return Result(new SignalCustomBookmark
{
Signal = signalName
});
}
}
}
I also registered the new bookmark in ConfigureServices(IServiceCollection services)
:
services.AddBookmarkProvider<SignalCustomBookmarkProvider>();
I created a test workflow and add this custom Signal Custom
with test-signal
as signal.
I can start the workflow using the api and it's working fine - once the instance gets to that Signal Custom
the workflow becames suspended.
I start the workflow using this call from Postman:
https://localhost:5001/v1/workflows/{workflow_id}/execute
But then I want to resume it by triggering this call
https://localhost:5001/v1/signals/test-signal/execute
with this body
{
"WorkflowInstanceId": "{Workflow_Instance_Id}"
}
Postman returns 200Ok with this body
{
"$id": "1",
"startedWorkflows": []
}
Could you please guide me how to resume the workflow from client-side?
The https://localhost:5001/v1/signals/test-signal/execute
endpoint won't work for you because internally, it uses ISignaler
:
await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId)
The default implementation of ISignaler
, in turn, does this:
var normalizedSignal = signal.ToLowerInvariant();
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
nameof(SignalReceived),
new SignalReceivedBookmark { Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);
Notice that the above code snippet is building a workflow query using the SignalReceived activity and the SignalReceivedBookmark bookmark.
These are not the same query parameters that you need in order to trigger workflows starting with or blocked on your custom SignalCustom activity and SignalCustomBookmark bookmark type.
In other words, you need to take two more steps to make it work:
ISignaler
.Better yet: define a new service called ICustomSignaler
and have its implementation do the work.
For example:
public interface ICustomSignaler
{
/// <summary>
/// Runs all workflows that start with or are blocked on the <see cref="SignalCustom"/> activity.
/// </summary>
Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = null, string? workflowInstanceId = null, string? correlationId = null, CancellationToken cancellationToken = default);
}
public class CustomSignaler : ICustomSignaler
{
public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
var normalizedSignal = signal.ToLowerInvariant();
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
nameof(SignalCustom ),
new SignalCustomBookmark{ Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);
}
Your custom controller could then look something like this:
[ApiController]
[Route("custom-signals/{signalName}/execute")]
[Produces("application/json")]
public class Execute : Controller
{
private readonly ICustomSignaler _signaler;
private readonly IEndpointContentSerializerSettingsProvider _serializerSettingsProvider;
public Execute(ICustomSignaler signaler, IEndpointContentSerializerSettingsProvider serializerSettingsProvider)
{
_signaler = signaler;
_serializerSettingsProvider = serializerSettingsProvider;
}
[HttpPost]
public async Task<IActionResult> Handle(string signalName, ExecuteSignalRequest request, CancellationToken cancellationToken = default)
{
var result = await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId, cancellationToken).ToList();
if (Response.HasStarted)
return new EmptyResult();
return Json(
new ExecuteSignalResponse(result.Select(x => new CollectedWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList()),
_serializerSettingsProvider.GetSettings());
}
}
We're tracking two issues to see if we can provide a way to allow users to extend the SignalReceived activity without having too much custom code: