Search code examples
c#rabbitmqstreamingservicestackamqp

Using ServiceStack and RabbitMQ to send a stream


I am attempting to send a stream using RabbitMQ and Servicestack (v1.0.41 using .NET Core).

My Request implements ServiceStack.Web.IRequiresRequestStream, and the stream property is set in the client, but when it gets to the server, the stream is NULL.

Complete Repo
Server Code:

using System;
using System.IO;
using System.Threading.Tasks;
using Funq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;

namespace Server
{
    class Program
    {

        public static void Main(string[] args)
        {
            IWebHost host = new WebHostBuilder()
                .UseServer(new RabbitServer())
                .UseStartup<Startup>()
                .Build();

            host.Run();
        }
    }

    public class RabbitServer : IServer
    {
        public void Dispose(){}

        public void Start<TContext>(IHttpApplication<TContext> application){}

        public IFeatureCollection Features { get; } = new FeatureCollection();
    }

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging();
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseServiceStack((AppHostBase)Activator.CreateInstance<AppHost>());
            app.Run((RequestDelegate)(context => (Task)Task.FromResult<int>(0)));
        }

    }

    public class AppHost : AppHostBase
    {
        public AppHost()
            : base("My Test Service", typeof(MyService).GetAssembly())
        {
        }

        public override void Configure(Container container)
        {
            var mqServer = new RabbitMqServer("127.0.0.1");
            container.Register<IMessageService>(mqServer);
            mqServer.RegisterHandler<HelloRequest>(ExecuteMessage);
            mqServer.Start();
        }
    }

    public class MyService : Service
    {
        public HelloResponse Any(HelloRequest request)
        {
            Console.WriteLine($"Stream is null: {request.RequestStream == null}");
            return new HelloResponse { Counter = request.Counter };

        }
    }

    public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
    {
        public int Counter { get; set; }

        public Stream RequestStream { get; set; }
    }

    public class HelloResponse
    {
        public int Counter { get; set; }
    }
}

Client Code:

using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
using System;
using System.IO;
using System.Text;

namespace Client
{
    class Program
    {

        static void Main(string[] args)
        {
            RabbitMqServer messageService = new RabbitMqServer("127.0.0.1");
            RabbitMqQueueClient mqClient = messageService.MessageFactory.CreateMessageQueueClient() as RabbitMqQueueClient;
            var responseQueueName = mqClient.GetTempQueueName();
            MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes("Hello World!")) { Position = 0 };
            HelloRequest request = new HelloRequest { Counter = 100, RequestStream = ms };  //Counter is just some arbitary extra data
            Guid messageId = Guid.NewGuid();

            mqClient.Publish(QueueNames<HelloRequest>.In, new Message<HelloRequest>(request) { ReplyTo = responseQueueName, Id = messageId });
        }
    }

    public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
    {
        public int Counter { get; set; }
        public Stream RequestStream { get; set; }
    }

    public class HelloResponse
    {
        public int Counter { get; set; }
    }
}

Note: I realise I could just use a byte[] in my request object, but I would quite like to make use of the provided IRequiresRequestStream interface so I can switch back to using HTTP rather than AMQP in the future.

I should also say, that I probably won't be using the RabbitMQ Client provided by servicestack, as I am writing custom logic to convert from HTTP to AMQP, so I will be building the rabbitMQ request manually - the code above just demonstrates the problem I am having in the simplest way possible.

I'm going to assume that this won't just work out of the box with AMQP (as it does with HTTP) - so I was thinking that I need to do something like serialize the stream to a byte[] and include it in the RabbitMQ message and then populate the dto which ServiceStack magically re-hydrates on the Server.

So two questions really...
1. Am I on the right track?
2. If so, how do I hook into the de-serialization code on the server so that I have access to the raw RabbitMQ message in order to convert my byte[] back to a stream and set the stream on my dto?


Solution

  • You can't send a IRequiresRequestStream Request DTO into a MQ because it's not a normal serialized Request DTO, instead it instructs ServiceStack to skip deserializing the Request DTO and instead inject the HTTP Request Stream so the Service can perform its own Deserialization instead, this is different to a normal Request DTO which is serialized and can be sent as the body in an MQ Message.

    One option if you want to share implementation between a IRequiresRequestStream Service and a Service that can be called by MQ is to just delegate to a common Service that accepts bytes, e.g:

    //Called from HTTP
    public object Any(HelloStream request) => 
        Any(new HelloBytes { Bytes = request.RequestStream.ReadFully() });
    
    //Called from HTTP or MQ
    public object Any(HelloBytes request) 
    {
        //= request.Bytes
    }