Search code examples
pythonsocketszeromq

Can't receive data on pull socket using zeroMQ, "Address in use" - Error


I currently try to set up a plain Push-Pull-Socket architecture using zeroMQ whereas Metatrader 4 (MQL) acts as publisher and my Python backend acts as consumer.

I push data from Metatrader 4 terminal every second which works just fine. However, I'm having tough times to receive data on the pull socket. Once I try to pull that data out of the wire the atom script package raises the error address already in use.

I run both the MT 4 terminal and the Python script on my local machine while in development.

Metatrader 4:

extern string PROJECT_NAME = "Dashex.Feeder";
extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "*";
extern int PUSH_PORT = 32220;

extern string t0 = "--- Feeder Parameters ---";
input string DID = "insert your DID here";
extern string t1 = "--- ZeroMQ Configuration ---";
extern bool Publish_MarketData = false;

// ZeroMQ environment //

// CREATE ZeroMQ Context
Context context(PROJECT_NAME);

// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);

string Publish_Symbols[7] = {
   "EURUSD","GBPUSD","USDJPY","USDCAD","AUDUSD","NZDUSD","USDCHF"
};

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+

int OnInit()
  {
//---

   EventSetTimer(1);     // Set Millisecond Timer to get client socket input

   context.setBlocky(false);

   // Send responses to PULL_PORT that client is listening on.
   Print("[PUSH] Connecting MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
   pushSocket.connect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   pushSocket.setSendHighWaterMark(1);
   pushSocket.setLinger(0);   

//---
   return(INIT_SUCCEEDED);
  }

//+------------------------------------------------------------------+
//| Expert deinitialization function                                 |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
//---

   Print("[PUSH] Disconnecting MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
   pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   // Shutdown ZeroMQ Context
   context.shutdown();
   context.destroy(0);

   EventKillTimer();
}
//+------------------------------------------------------------------+
//| Expert tick function                                             |
//+------------------------------------------------------------------+
void OnTimer()
{
   /*
      Use this OnTimer() function to send market data to consumer.
   */
   if(!IsStopped() && Publish_MarketData == true)
   {
      for(int s = 0; s < ArraySize(Publish_Symbols); s++)
      {
         // Python clients can subscribe to a price feed by setting
         // socket options to the symbol name. For example:

         string _tick = GetBidAsk(Publish_Symbols[s]);
         Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUSH Socket");
         ZmqMsg reply(StringFormat("%s %s", Publish_Symbols[s], _tick));
         pushSocket.send(reply, true);
      }
   }
}
//+------------------------------------------------------------------+

string GetBidAsk(string symbol) {

   MqlTick last_tick;

   if(SymbolInfoTick(symbol,last_tick))
   {
       return(StringFormat("%f;%f", last_tick.bid, last_tick.ask));
   }

   // Default
   return "";
}

Pushing data works as intended:

enter image description here

Python based pull socket:

import zmq
import time

context = zmq.Context()
zmq_socket = context.socket(zmq.PULL)
zmq_socket.bind("tcp://*:32220")
time.sleep(1)

while True:
    result = zmq_socket.recv()
    print(result)
    time.sleep(1)

This is what script reports in the console:

enter image description here

Netstat output:

enter image description here

Note: When I terminate both the Metatrader push script and the python script the port is still marked as "listened" in netstats. When I change the port to 32225 (or any other) in both instances and re-run them, I get the same error again. If I first run the pull instance I have a sandglass popping up in atom script, when I then run the MT4 push instance nothing happens on the pull side. When I then re-run the pull instance I get the same error again.

Update:

The python.exe instance in the background occupied the port. I shut down the python execution and the port was released again. When I now run my pull instance i receive the following console feedback:

1.)

enter image description here

2.)

I then run the push instance which works just fine.

3.)

The pull instance still shows the sandglass and doesn't print any data into the console:

enter image description here

4.)

When I then re-run the pull instance it raises the error address in use which now makes sense since Python uses that port still in the background.

But why isn't any data printed on the pull side? Do I have to change the pull client code in order to be able to "grab" the pushed data?


Solution

  • The problem is with your PUSH code, here:

    extern string HOSTNAME = "*";
    

    While you can legitimately use * for the hostname component in a bind URL (in which case it means "listen on all addresses"), it doesn't make any sense in a connect call: you must provide a valid hostname or ip address.

    If you were to modify your code to read:

    extern string HOSTNAME = "localhost";
    

    It would probably work as expected.

    Here's a simple Python PUSH client I used to test your PULL code; if you run this and run your PULL code (as posted in your question), it all works:

    import time
    import zmq
    
    c = zmq.Context()
    s = c.socket(zmq.PUSH)
    
    s.connect('tcp://localhost:32220')
    
    i = 0
    while True:
        s.send_string('this is message {}'.format(i))
        i += 1
        time.sleep(0.5)