I'm using MassTransit 8.25.0 on .NET 8.
I have a Windows service with a scheduler and a job consumer. Can't figure out how to queue a job, IPublishEndpoint
from example is a scoped service, so I can't inject it.
My attempts
clientFactory.CreateRequestClient
public class ImportScheduler(
IScheduleConfig<ImportScheduler> config,
ILogger<ImportScheduler> logger,
IClientFactory clientFactory)
: CronJobService(config.CronExpression, config.TimeZoneInfo, logger)
{
protected override async Task DoWork(CancellationToken cancellationToken)
{
var requestClient = clientFactory.CreateRequestClient<ImportRequested>();
var jobId = NewId.Next();
await requestClient.GetResponse<JobSubmissionAccepted>(new ImportRequested(
jobId.ToGuid(),
Guid.Parse("7718D173-1F3F-4F8C-B282-5B5A1C183BCE")), cancellationToken);
}
}
This throws an error:
MassTransit.EventExecutionException: The StartJobAttempt (Event) execution faulted
MassTransit.PayloadNotFoundException: The payload was not found: MassTransit.MessageSchedulerContext
IPublishEndpoint
from IBus
with GetPublishSendEndpoint
, got ISendEndpoint
public class ImportScheduler(
IScheduleConfig<ImportScheduler> config,
ILogger<ImportScheduler> logger,
IBus bus)
: CronJobService(config.CronExpression, config.TimeZoneInfo, logger)
{
protected override async Task DoWork(CancellationToken cancellationToken)
{
var publishEndpoint = await bus.GetPublishSendEndpoint<SubmitJob<ImportRequested>>();
var jobId = NewId.Next();
await publishEndpoint.Send(new
{
JobId = jobId,
Job = new ImportRequested(
jobId.ToGuid(),
Guid.Parse("7718D173-1F3F-4F8C-B282-5B5A1C183BCE"))
}, cancellationToken);
}
}
bus.Publish<SubmitJob<ImportRequested>>
directly, but got error similar to the first one.Also I got an error below, but not exactly sure where it comes from
System.NullReferenceException: Object reference not set to an instance of an object.
at MassTransit.ExceptionInfoException..ctor(ExceptionInfo exceptionInfo) in //src/MassTransit.Abstractions/Exceptions/ExceptionInfoException.cs:line 14
at MassTransit.JobService.FinalizeJobConsumer1.Consume(ConsumeContext
1 context) in //src/MassTransit/JobService/JobService/FinalizeJobConsumer.cs:line 43
at MassTransit.Middleware.MethodConsumerMessageFilter2.MassTransit.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext
2 context, IPipe1 next) in /_/src/MassTransit/Middleware/MethodConsumerMessageFilter.cs:line 28 at MassTransit.Configuration.PipeConfigurator
1.LastPipe.Send(TContext context) in //src/MassTransit.Abstractions/Middleware/Configuration/PipeBuilder.cs:line 123
at MassTransit.Consumer.DelegateConsumerFactory1.Send[TMessage](ConsumeContext
1 context, IPipe1 next) in /_/src/MassTransit/Consumers/Consumer/DelegateConsumerFactory.cs:line 29 at MassTransit.Consumer.DelegateConsumerFactory
1.Send[TMessage](ConsumeContext1 context, IPipe
1 next) in //src/MassTransit/Consumers/Consumer/DelegateConsumerFactory.cs:line 39
at MassTransit.Middleware.ConsumerMessageFilter2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext
1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 48
Configuration
services.AddMassTransit(x =>
{
// some consumers
x.AddConsumer<ImportConsumer>(cc =>
{
cc.Options<JobOptions<ImportRequested>>(xx =>
xx.SetRetry(r => r.None()).SetJobTimeout(TimeSpan.FromHours(3)).SetConcurrentJobLimit(1));
});
x.SetJobConsumerOptions();
x.AddJobSagaStateMachines();
x.SetInMemorySagaRepositoryProvider();
x.SetKebabCaseEndpointNameFormatter();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(configuration.GetValue<string>("AzureServiceBus"));
cfg.ConfigureEndpoints(context);
});
});
If I change my consumer from IJobConsumer
to IConsumer
, I am able to queue it using IBus.Publish
First, you can simply your job submission:
public class ImportScheduler(
IScheduleConfig<ImportScheduler> config,
ILogger<ImportScheduler> logger,
IBus bus)
: CronJobService(config.CronExpression, config.TimeZoneInfo, logger)
{
protected override async Task DoWork(CancellationToken cancellationToken)
{
var jobId = NewId.Next();
await bus.Publish<SubmitJob<ImportRequested>>(new
{
JobId = jobId,
Job = new ImportRequested(
jobId.ToGuid(),
Guid.Parse("7718D173-1F3F-4F8C-B282-5B5A1C183BCE"))
}, cancellationToken);
}
}
And your bus configuration is missing the scheduler.
services.AddMassTransit(x =>
{
// some consumers
x.AddConsumer<ImportConsumer>(cc =>
{
cc.Options<JobOptions<ImportRequested>>(xx =>
xx.SetRetry(r => r.None()).SetJobTimeout(TimeSpan.FromHours(3)).SetConcurrentJobLimit(1));
});
x.SetJobConsumerOptions();
x.AddJobSagaStateMachines();
x.SetInMemorySagaRepositoryProvider();
x.SetKebabCaseEndpointNameFormatter();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(configuration.GetValue<string>("AzureServiceBus"));
cfg.UseServiceBusMessageScheduler();
cfg.ConfigureEndpoints(context);
});
});
This is all demonstrated in the sample as well.