Search code examples
c#masstransit.net-8.0

The StartJobAttempt<StartJobAttempt> (Event) execution faulted


I'm using MassTransit 8.25.0 on .NET 8.

I have a Windows service with a scheduler and a job consumer. Can't figure out how to queue a job, IPublishEndpoint from example is a scoped service, so I can't inject it.

My attempts

  1. clientFactory.CreateRequestClient
public class ImportScheduler(
    IScheduleConfig<ImportScheduler> config,
    ILogger<ImportScheduler> logger,
    IClientFactory clientFactory)
    : CronJobService(config.CronExpression, config.TimeZoneInfo, logger)
{
    protected override async Task DoWork(CancellationToken cancellationToken)
    {
        var requestClient = clientFactory.CreateRequestClient<ImportRequested>();

        var jobId = NewId.Next();
        await requestClient.GetResponse<JobSubmissionAccepted>(new ImportRequested(
            jobId.ToGuid(),
            Guid.Parse("7718D173-1F3F-4F8C-B282-5B5A1C183BCE")), cancellationToken);
    }
}

This throws an error:

MassTransit.EventExecutionException: The StartJobAttempt (Event) execution faulted
MassTransit.PayloadNotFoundException: The payload was not found: MassTransit.MessageSchedulerContext

  1. Tried to create IPublishEndpoint from IBus with GetPublishSendEndpoint, got ISendEndpoint
public class ImportScheduler(
    IScheduleConfig<ImportScheduler> config,
    ILogger<ImportScheduler> logger,
    IBus bus)
    : CronJobService(config.CronExpression, config.TimeZoneInfo, logger)
{
    protected override async Task DoWork(CancellationToken cancellationToken)
    {
        var publishEndpoint = await bus.GetPublishSendEndpoint<SubmitJob<ImportRequested>>();

        var jobId = NewId.Next();
        await publishEndpoint.Send(new
        {
            JobId = jobId,
            Job = new ImportRequested(
                jobId.ToGuid(),
                Guid.Parse("7718D173-1F3F-4F8C-B282-5B5A1C183BCE"))
        }, cancellationToken);
    }
}
  1. Used bus.Publish<SubmitJob<ImportRequested>> directly, but got error similar to the first one.

Also I got an error below, but not exactly sure where it comes from

System.NullReferenceException: Object reference not set to an instance of an object.

at MassTransit.ExceptionInfoException..ctor(ExceptionInfo exceptionInfo) in //src/MassTransit.Abstractions/Exceptions/ExceptionInfoException.cs:line 14
at MassTransit.JobService.FinalizeJobConsumer1.Consume(ConsumeContext1 context) in /
/src/MassTransit/JobService/JobService/FinalizeJobConsumer.cs:line 43
at MassTransit.Middleware.MethodConsumerMessageFilter2.MassTransit.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext2 context, IPipe1 next) in /_/src/MassTransit/Middleware/MethodConsumerMessageFilter.cs:line 28 at MassTransit.Configuration.PipeConfigurator1.LastPipe.Send(TContext context) in //src/MassTransit.Abstractions/Middleware/Configuration/PipeBuilder.cs:line 123
at MassTransit.Consumer.DelegateConsumerFactory1.Send[TMessage](ConsumeContext1 context, IPipe1 next) in /_/src/MassTransit/Consumers/Consumer/DelegateConsumerFactory.cs:line 29 at MassTransit.Consumer.DelegateConsumerFactory1.Send[TMessage](ConsumeContext1 context, IPipe1 next) in /
/src/MassTransit/Consumers/Consumer/DelegateConsumerFactory.cs:line 39
at MassTransit.Middleware.ConsumerMessageFilter2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 48

Configuration

services.AddMassTransit(x =>
{
    // some consumers

    x.AddConsumer<ImportConsumer>(cc =>
    {
        cc.Options<JobOptions<ImportRequested>>(xx =>
            xx.SetRetry(r => r.None()).SetJobTimeout(TimeSpan.FromHours(3)).SetConcurrentJobLimit(1));
    });

    x.SetJobConsumerOptions();
    x.AddJobSagaStateMachines();
    x.SetInMemorySagaRepositoryProvider();

    x.SetKebabCaseEndpointNameFormatter();

    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host(configuration.GetValue<string>("AzureServiceBus"));

        cfg.ConfigureEndpoints(context);
    });
});

If I change my consumer from IJobConsumer to IConsumer, I am able to queue it using IBus.Publish


Solution

  • First, you can simply your job submission:

    public class ImportScheduler(
        IScheduleConfig<ImportScheduler> config,
        ILogger<ImportScheduler> logger,
        IBus bus)
        : CronJobService(config.CronExpression, config.TimeZoneInfo, logger)
    {
        protected override async Task DoWork(CancellationToken cancellationToken)
        {
            var jobId = NewId.Next();
            await bus.Publish<SubmitJob<ImportRequested>>(new
            {
                JobId = jobId,
                Job = new ImportRequested(
                    jobId.ToGuid(),
                    Guid.Parse("7718D173-1F3F-4F8C-B282-5B5A1C183BCE"))
            }, cancellationToken);
        }
    }
    

    And your bus configuration is missing the scheduler.

    services.AddMassTransit(x =>
    {
        // some consumers
    
        x.AddConsumer<ImportConsumer>(cc =>
        {
            cc.Options<JobOptions<ImportRequested>>(xx =>
                xx.SetRetry(r => r.None()).SetJobTimeout(TimeSpan.FromHours(3)).SetConcurrentJobLimit(1));
        });
    
        x.SetJobConsumerOptions();
        x.AddJobSagaStateMachines();
        x.SetInMemorySagaRepositoryProvider();
    
        x.SetKebabCaseEndpointNameFormatter();
    
        x.UsingAzureServiceBus((context, cfg) =>
        {
            cfg.Host(configuration.GetValue<string>("AzureServiceBus"));
    
            cfg.UseServiceBusMessageScheduler();
    
            cfg.ConfigureEndpoints(context);
        });
    });
    

    This is all demonstrated in the sample as well.