Search code examples
gomqttgoroutinepaho

Paho MQTT golang for multiple modules?


I am writing a microservice in golang for a mqtt module. This module will be used by different function at the same time. I am using Grpc as a transport layer. I have made a connect function which is this..

func Connect() { //it would be Connect(payload1 struct,topic string)

    deviceID := flag.String("device", "handler-1", "GCP Device-Id")
    bridge := struct {
        host *string
        port *string
    }{
        flag.String("mqtt_host", "", "MQTT Bridge Host"),
        flag.String("mqtt_port", "", "MQTT Bridge Port"),
    }
    projectID := flag.String("project", "", "GCP Project ID")
    registryID := flag.String("registry", "", "Cloud IoT Registry ID (short form)")
    region := flag.String("region", "", "GCP Region")
    certsCA := flag.String("ca_certs", "", "Download https://pki.google.com/roots.pem")
    privateKey := flag.String("private_key", "", "Path to private key file")

    server := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
    topic := struct {
        config    string
        telemetry string
    }{
        config:    fmt.Sprintf("/devices/%v/config", *deviceID),
        telemetry: fmt.Sprintf("/devices/%v/events/topic", *deviceID),
    }
    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := fmt.Sprintf("projects/%v/locations/%v/registries/%v/devices/%v",
        *projectID,
        *region,
        *registryID,
        *deviceID,
    )
    log.Println("[main] Loading Google's roots")
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(*certsCA)
    if err == nil {
        certpool.AppendCertsFromPEM(pemCerts)
    }

    log.Println("[main] Creating TLS Config")
    config := &tls.Config{
        RootCAs:            certpool,
        ClientAuth:         tls.NoClientCert,
        ClientCAs:          nil,
        InsecureSkipVerify: true,
        Certificates:       []tls.Certificate{},
        MinVersion:         tls.VersionTLS12,
    }

    flag.Parse()

    connOpts := MQTT.NewClientOptions().
        AddBroker(server).
        SetClientID(clientid).
        SetAutoReconnect(true).
        SetPingTimeout(10 * time.Second).
        SetKeepAlive(10 * time.Second).
        SetDefaultPublishHandler(onMessageReceived).
        SetConnectionLostHandler(connLostHandler).
        SetReconnectingHandler(reconnHandler).
        SetTLSConfig(config)
    connOpts.SetUsername("unused")
    ///JWT Generation Starts from Here
    token := jwt.New(jwt.SigningMethodES256)
    token.Claims = jwt.StandardClaims{
        Audience:  *projectID,
        IssuedAt:  time.Now().Unix(),
        ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
    }
    //Reading key file
    log.Println("[main] Load Private Key")
    keyBytes, err := ioutil.ReadFile(*privateKey)
    if err != nil {
        log.Fatal(err)
    }
    //Parsing key from file
    log.Println("[main] Parse Private Key")
    key, err := jwt.ParseECPrivateKeyFromPEM(keyBytes)
    if err != nil {
        log.Fatal(err)
    }
    //Signing JWT with private key
    log.Println("[main] Sign String")
    tokenString, err := token.SignedString(key)
    if err != nil {
        log.Fatal(err)
    }
    //JWT Generation Ends here

    connOpts.SetPassword(tokenString)
    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic.config, byte(*qos), nil); token.Wait() && token.Error() != nil {
            log.Fatal(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("Not Connected..Retrying...  %s\n", server)
    } else {
        fmt.Printf("Connected to %s\n", server)
    }

}

i am calling this function in go routine in my main.go

func main() {
    fmt.Println("Server started at port 5005")
    lis, err := net.Listen("tcp", "0.0.0.0:5005")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    //Creating keepAlive channel for mqttt subscribe
    keepAlive := make(chan os.Signal)
    defer close(keepAlive)
    go func() {
        //checking for internet connection
        for !IsOnline() {
            fmt.Println("No Internet Connection..Retrying")
            //looking for internet connection after every 8 seconds
            time.Sleep(8 * time.Second)
        }
        fmt.Println("Internet connected...connecting to mqtt broker")
        repositories.Connect()
        //looking for interupt(Ctrl+C)
        value := <-keepAlive
        //If Ctrl+C is pressed then exit the application
        if value == os.Interrupt {
            fmt.Printf("Exiting the application")
            os.Exit(3)
        }
    }()
    s := grpc.NewServer()
    MqttRepository := repositories.MqttRepository()
    // It creates a new gRPC server instance
    rpc.NewMqttServer(s, MqttRepository)
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)

    }
}

func IsOnline() bool {
    timeout := time.Duration(5000 * time.Millisecond)
    client := http.Client{
        Timeout: timeout,
    }
    //default url to check connection is http://google.com
    _, err := client.Get("https://google.com")

    if err != nil {
        return false
    }

    return true
}

I am using the go routine in my main in order for the application to start on every startup.

Now I want to use this MQTT Connect function to publish the data from other different functions.

e.g. Function A can call it like Connect(payload1,topic1) and function B can call it like Connect(payload2,topic2) and then this function should handle the publishing the data into the cloud.

Should I just add the topic and payload in this Connect function and then call it from another function? or is there any possibility that I can return or export the client as a global and then use it in another function or go routine? I am sorry if my question sound very stupid .. I am not a golang expert..


Solution

  • Now I want to use this MQTT Connect function to publish the data from other different functions.

    I suspect I may be misunderstanding what you are trying to do here but unless you have a specific reason for making multiple connections you are best to connect once and then use that single connection to publish multiple messages. There are a few issues with establishing a connection each time you send a message including:

    • Establishing the connection takes time and generates a bit of network traffic (TLS handshake etc).
    • There can only be one active connection for a given ClientID (if you establish a second connection the broker will close the previous connection).
    • The library will not automatically disconnect - you would need to call Disconnect after publishing.
    • Incoming messages are likely to be lost due to the connection being down (note that CleanSession defaults to true).

    Should I just add the topic and payload in this Connect function and then call it from another function?

    As mentioned above the preferred approach would be to connect once and then publish multiple messages over the one connection. The Client is designed to be thread safe so you can pass it around and call Publish from multiple go routines. You can also make use of AutoConnect option (which you are) if you want the library to manage the connection (there is also a SetConnectRetry function) but bear in mind that a QOS 0 message will not be retried if the link is down when you attempt to send it.

    I would suggest that your connect function return the client (i.e. func Connect() mqtt.Client) and then use that client to publish messages (you can store it somewhere or just pass it around; I'd suggest adding it you your grpc server struct).

    I guess it is possible that you may need to establish multiple connections if you need to connect with a specific clientid in order to send to the desired topic (but generally you would give your servers connection access to a wide range of topics). This would require some work to ensure you don't try to establish multiple connections with the same client id simultaneously and, depending upon your requirements, receiving incoming messages.

    A few additional notes:

    • If you use AutoConnect and SetConnectRetry you can simplify your code code (and just use IsConnectionOpen() to check if the connection is up removing the need for IsOnline()).
    • The spec states that "The Server MUST allow ClientIds which are between 1 and 23 UTF-8 encoded bytes in length" - it looks like yours is longer than that (I have not used GCP and it may well support/require a longer client ID).
    • You should not need InsecureSkipVerify in production.