Search code examples
gotimeoutblockingnonblockingnanomsg

Problems with mangos - the nanomsg bus protocol provided by Golang package


I'd like to use nanomsg/nng as the communication basis of a fully distributed peer-to-peer multi-node network, to help construct the dynamic ability of topological discovery and maintenance. Now I get stuck in its Golang package mangos.

The same work has been done in Python and pynng (which is a python binding for nanomsg), but when I use Go and invoke the corresponding methods by mangos instead, their behaviors are totally different. The puzzle is mainly threefold:

  1. The bus-type-Socket's Recv() acts in a blocking mode by default and seems not to be configurable to the non-blocking mode. Documents says:

OptionRecvDeadline is the time until the next Recv times out. The value is a time.Duration. Zero value may be passed to indicate that no timeout should be applied. A negative value indicates a non-blocking operation. By default there is no timeout.

I tried a negative value accordingly, but Recv() was still blocking. What else should I do? and how to understand the difference between "Zero-timeout" and "non-blocking"?

  1. The dialer returned by (s *socket) NewDialer(...) seems to linger on after calling dialer.Close(), since an error will occur when calling a next dialer.Dial() reporting it's still "address in use". But when I tried to Close() the dialer again, error occurs as well reporting it's already closed. I also tried different combinations of the following options, but all the attempts failed
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true                    // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond  // or zero 
opts[mangos.OptionKeepAliveTime] = time.Millisecond     // or even smaller
opts[mangos.OptionKeepAlive] = false                    // or true

What should I do when I want to kill the dialer completely, or want to reuse the "pseudo-closed" dialer some time later?

  1. The bus-type-Socket's Send() is strange. Normally each node is supposed to periodically send a message in my code. I shut down the physical connection of a node (say "Node-X") from the network, keep it offline for a while, and then reconnect it to the network. I found Node-X would re-send lots of messages immediately when it got reconnected. But what I really expect is that Node-X could send those messages to the air even if it has no neighbors.

I wonder if there is any way to come over these problems. I guess it could be missing some options or configurations, but I failed to figure them out.

The following code is used for reproducing the re-dial and re-close errors.

package main

import (
    "fmt"
    "os"
    "time"

    "go.nanomsg.org/mangos/v3"
    "go.nanomsg.org/mangos/v3/protocol/bus"

    // register transports
    _ "go.nanomsg.org/mangos/v3/transport/all"
)

var (
    sock      mangos.Socket
    DialerMap map[string]*mangos.Dialer
    opts      map[string]interface{}
)

func main() {
    var err error
    opts = make(map[string]interface{})
    opts[mangos.OptionDialAsynch] = true
    opts[mangos.OptionMaxReconnectTime] = time.Millisecond
    // opts[mangos.OptionKeepAliveTime] = time.Millisecond
    opts[mangos.OptionKeepAlive] = false
    DialerMap = make(map[string]*mangos.Dialer)

    if sock, err = bus.NewSocket(); err != nil {
        fmt.Println("bus.NewSocket error. ", err)
        os.Exit(1)
    }
    TargetUUID := "node-A"
    TargetAddr := "tcp://192.168.0.172:60000"   // this should be changed to a available address
    MyDial(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyDial(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID, TargetAddr)
    time.Sleep(100 * time.Second)

}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
    _, is_exist := DialerMap[TargetUUID]
    var err error
    var dialer mangos.Dialer
    if !is_exist {
        dialer, err = sock.NewDialer(TargetAddr, opts)
        if err != nil {
        } else {
            DialerMap[TargetUUID] = &dialer
        }
    }
    dialer = *DialerMap[TargetUUID]
    err = dialer.Dial()
    if err != nil {
        fmt.Println("Dialer fails to dial()", err)
    } else {
        fmt.Println("Dialer succeeds to dial()")
    }

    return dialer, err
}

func MyClose(TargetUUID string, TargetAddr string) {
    dialerAddr, is_exist := DialerMap[TargetUUID]
    if !is_exist {
        fmt.Println("Dialer does not exist")
    }
    dialer := *dialerAddr
    err := dialer.Close()

    if err != nil {
        fmt.Println("dialer fails to close.", err)
    } else {
        fmt.Println("dialer succeeds to close")
    }

}

and console output is

Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed

Solution

  • I don't usually monitor stackoverflow or reddit for questions like this -- we do have a discord channel (link from the mangos and NNG home pages), as well as a mailing list.

    Having said that, let me see if I can help (I'm the author for both NNG and mangos):

    1. OptionRecvDeadline is supported for bus. However, you're correct that it doesn't support non-blocking mode with a negative value, instead the negative value is treated the same as zero, and acts as blocking. This is a documentation bug. To achieve a logical non-blocking, use the value "1", which means one nanosecond, and that will logically equate to non-blocking, although the granularity may be limited by the scheduler latency. (In this case it would be like doing a "go close(channel); <-channel" -- very nearly non-blocking.

    I'll see about fixing the documentation.

    1. Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this -- I'll be honest in saying that I had not considered tiny redial times -- usually it's bad form to do this because it means that if the peer is not available your code will spin hard on the processor trying to reconnect. I usually recommend at minimum a 10 millisecond retry interval cap. (mangos.OptionMaxReconnectTime)

    2. I think you're seeing the effect of queueing, but I'm not 100% certain -- I'd need to see a test case reproducing this. Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)