Search code examples
azureasp.net-corezeromqazure-service-fabricnetmq

Need a ZeroMQ implementation of an ICommunicationListener on Azure Service Fabric


I am looking for a ZeroMQ implementation of an ICommunicationListener that I can use with a service fabric to run a ZeroMQ endpoint on Azure.

I looked for hours and I can't find any. Does anyone know a solution for this? I currently use the "Service App Fabric / .net core 2.0 stateless service" template,
which allows me to override
IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners(),
when I have the ICommunicationListener implementation for ZeroMQ,
or to override a Task RunAsync(CancellationToken cancellationToken),
when I want to setup the sockets myself.

My first try won't work:

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    using (var server = new ResponseSocket("tcp://xx.xx.xx.xx:xxxxx"))
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var message = server.ReceiveFrameBytes();
            ServiceEventSource.Current.ServiceMessage(this.Context, "Message {0}",
                System.Text.Encoding.UTF8.GetString(message));
        }
    }
}

Result of the above is a service that won't start. Can't find much logging except this:

"There was an error during CodePackage activation.The service host terminated with exit code:255"


Solution

  • Here is an rough example of an ICommunicationListener implementation for ZeroMQ. This implementation will act as ZeroMQ ResponseSocket, but can easily be changed to RequestSocket, SubscriberSocket or any kind of NetMQ.Sockets.* socket implementation that you like. Of course it will need some more detail in the implementation like not throwing an exception on retrieving a message, but it should give a clear view of how its done. Its greatly inspired by existing dotnetcore implementations of the ICommunicationListener interface.

    public class ZeroMqResponseSocketCommunicationListener : ICommunicationListener, IDisposable
    {
        private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
        private readonly ResponseSocket _responseSocket = new ResponseSocket();
        private readonly ServiceContext _serviceContext;
        private readonly string _endpointName;
    
        public ZeroMqResponseSocketCommunicationListener(ServiceContext serviceContext, string endpointName)
        {
            if (string.IsNullOrEmpty(endpointName))
                throw new ArgumentException("endpointName cannot be null or empty string.");
    
            _serviceContext = serviceContext;
            _endpointName = endpointName;
        }
    
        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            var address = GetListenerUrl();
            if (address == null)
                throw new InvalidOperationException("No Url returned from ZeroMqResponseSocketCommunicationListener.GetListenerUrl");
    
    
            _responseSocket.Bind(address);
    
            ThreadPool.QueueUserWorkItem(state => MessageHandler(_cancellationToken.Token));
    
            return Task.FromResult(address);
        }
    
        public Task CloseAsync(CancellationToken cancellationToken)
        {
            _responseSocket.Close();
            return Task.FromResult(true);
        }
    
        public void Abort()
        {
            _responseSocket.Close();
        }
    
        private void MessageHandler(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var message = _responseSocket.ReceiveFrameBytes();
                if (message != null)
                    throw new Exception($"Message {Encoding.UTF8.GetString(message)}");
            }
        }
    
        private string GetListenerUrl()
        {
            var endpoints = _serviceContext.CodePackageActivationContext.GetEndpoints();
    
            if (!endpoints.Contains(_endpointName))
                throw new InvalidOperationException($"{_endpointName} not found in Service Manifest.");
    
            var serviceEndpoint = _serviceContext.CodePackageActivationContext.GetEndpoint(_endpointName);
    
            if (string.IsNullOrEmpty(serviceEndpoint.IpAddressOrFqdn))
                throw new InvalidOperationException("IpAddressOrFqdn not set on endpoint");
    
            if (serviceEndpoint.Port <= 0)
                throw new InvalidOperationException("Port not set on endpoint");
    
            var listenUrl = $"{serviceEndpoint.Protocol.ToString().ToLower()}://{serviceEndpoint.IpAddressOrFqdn}:{serviceEndpoint.Port}";
    
            return listenUrl;
        }
    
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    
        protected virtual void Dispose(bool disposing)
        {
            if (!disposing || _responseSocket == null) return;
    
            try
            {
                _responseSocket.Close();
                _responseSocket.Dispose();
            }
            catch (Exception ex)
            {
                ServiceEventSource.Current.Message(ex.Message);
            }
        }
    }
    

    And return the ZeroMqResponseSocketCommunicationListener in your app fabric service:

    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        yield return new ServiceInstanceListener(listener => new ZeroMqResponseSocketCommunicationListener(listener, "EndpointName"));
    }
    

    Make sure you have an endpoint specified in the ServiceManifest.xml of your service:

    <Resources>
      <Endpoints>
        <Endpoint Name="EndpointName" Port="80" Protocol="tcp" />
      </Endpoints>
    </Resources>