Search code examples
scalamqttmosquittohivemq

Unable to Connect to any MQTT Broker from my Local machine using HiveMQ Async Clienr


I have the following publish method that connects to a given broker and sends a message and then disconnects:

 def publish(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
    val client = asyncMqttClient(mqttCfg)

    // Define a custom wrapper type to represent the result of the publish operation
    sealed trait PublishResult
    case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
    case class FailedPublish(error: Throwable) extends PublishResult

    asyncMqttClient(mqttCfg).connect()
      .thenCompose(_ => client.publishWith().topic(topic).qos(mqttQos).payload("HELLO WORLD!".getBytes()).send())
      .thenAccept(result => {
        val publishResult = Try(result)
        publishResult match {
          case Success(message) =>
            println(s"publishedResult: $message") // TODO: Change to logger
          case Failure(error) =>
            println(s"Failed to publish: ${error.getMessage}") // TODO: Change to logg
        }
      })
      .thenCompose(_ => client.disconnect())
      .thenAccept(_ => println("disconnected"))
      .asScala.map(_ => ())
  }

I then have a Scala test that simply tests this like this:

  "MqttClientFactory#publish" should "connect to a local MQTT broker and publish" in {
    val mqttConfig = MqttConfig("cpo-platform-test", "test.mosquitto.org", 1883)
    val published = MqttClientFactory.publish(
      mqttConfig,
      "cpo-test-topic",
      MqttQos.EXACTLY_ONCE
    )
    whenReady(published, timeout(Span(100, Seconds))) { Unit => {
      val client = MqttClientFactory.asyncMqttClient(mqttConfig)
      println("In here ****************** ")
      client
        .connect()
        .thenCompose(_ => client.subscribeWith().topicFilter("cpo-test-topic").qos(MqttQos.EXACTLY_ONCE).callback(println).send())
      }
    }
  }

When I ran this, it results in the following error on the place where I'm waiting for the Future to complete in the whenReady(......)

The future returned an exception of type: java.util.concurrent.CompletionException, with message: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected..
ScalaTestFailureLocation: com.openelectrons.cpo.mqtt.MqttClientFactoryTest at (MqttClientFactoryTest.scala:29)

I tried several brokers on my local machine, the eclipse mosquitto broker, the cedalo broker and all of them return the same message. What am I doing wrong? It is so annoying to have a simple connection to get it working. Any help?

EIDT: Further details added:

  def asyncMqttClient(mqttCfg: MqttConfig): Mqtt5AsyncClient = {
    Mqtt5Client.builder()
      .identifier(mqttCfg.appName)
      .serverHost(mqttCfg.serverHost)
      .serverPort(mqttCfg.serverPort)
      .simpleAuth()
        .username(mqttCfg.user.getOrElse(""))
        .password(mqttCfg.pass.getOrElse("").getBytes("UTF-8"))
      .applySimpleAuth()
      .buildAsync()
  }

I use the following docker compose to start my local mqtt mosquitto server:

version: "3.7"
services:
  mqtt5:
    image: eclipse-mosquitto
    container_name: mqtt5
    ports:
      - 1883:1883 #default mqtt port
      - 9001:9001 #default mqtt port for websockets
    volumes:
      - /opt/softwares/mosquitto/mqtt5/config:/mosquitto/config

The MQTT broker is successfully started as shown in the screenshot below:

enter image description here

EDIT:

Here is my mosquitto.conf:

listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log

Here is a screenshot of the logs:

enter image description here

EDIT:

joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_pub -t /test/message -m 'Hello World!'
joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 tail -f /mosquitto/log/mosquitto.log
1696296934: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696298735: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696300536: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696302337: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696304138: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696305939: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696307740: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696309170: New connection from 127.0.0.1:39422 on port 1883.
1696309170: New client connected from 127.0.0.1:39422 as auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C (p2, c1, k60).
1696309170: Client auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C disconnected.
^Cjoesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_sub -v -t /test/message
/test/message Hello World!

With the scala test, I see the following logs:

1696310903: New connection from 192.168.208.1:57752 on port 1883.
1696310903: New client connected from 192.168.208.1:57752 as cpo-platform-test (p5, c1, k60).
1696310903: Client cpo-platform-test closed its connection.

Solution

  • Here you have a simple POC that I was able to run in my local. It doesn't validate anything. I only start a eclipse-mosquitto container, connect to the service using the hivemq-mqtt-client, publish a message, subscribe to the topic and print the received message to stdout.

    • build.sbt
    libraryDependencies ++= Seq(
      "com.hivemq" % "hivemq-mqtt-client" % "1.3.2",
      "org.scalatest" %% "scalatest" % "3.2.16" % Test,
      "com.dimafeng" %% "testcontainers-scala-scalatest" % TestcontainersScalaVersion % Test
    )
    
    • docker-compose.yaml
    version: "3"
    services:
      mosquitto:
        image: eclipse-mosquitto:2.0.18
        volumes:
          - /absolute/path/to/mosquitto/config/:/mosquitto/config
        ports:
          - 1883:1883
          - 9001:9001
    
    • /absolute/path/to/mosquitto/config/mosquitto.conf
    listener 1883
    allow_anonymous true
    persistence true
    persistence_location /mosquitto/data/
    log_dest file /mosquitto/log/mosquitto.log
    
    • DummyMosquittoTest.scala
    import com.dimafeng.testcontainers.scalatest.TestContainersForAll
    import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService}
    import com.hivemq.client.mqtt.datatypes.MqttQos
    import com.hivemq.client.mqtt.mqtt5.{Mqtt5AsyncClient, Mqtt5Client}
    import org.scalatest.funsuite.AsyncFunSuite
    import org.scalatest.matchers.should.Matchers
    
    import java.io.File
    import java.util.UUID
    import scala.jdk.FutureConverters._
    
    class TestcontainersMainTest
        extends AsyncFunSuite
        with Matchers
        with TestContainersForAll {
    
      override type Containers = DockerComposeContainer
    
      override def startContainers(): DockerComposeContainer = {
        DockerComposeContainer
          .Def(
            composeFiles = new File(
              this.getClass.getClassLoader
                .getResource("docker-compose.yaml")
                .getFile
            ),
            exposedServices = Seq(ExposedService(name = "mosquitto", port = 1883))
          )
          .start()
      }
    
      test("mosquitto container") {
        withContainers { container =>
          val client: Mqtt5AsyncClient = Mqtt5Client
            .builder()
            .identifier(UUID.randomUUID().toString())
            .serverHost("broker.hivemq.com")
            .buildAsync()
    
          client
            .connect()
            .thenCompose(_ =>
              client
                .publishWith()
                .topic("test/topic")
                .payload("some random message!!!".getBytes)
                .send()
            )
            .asScala
            .map(_ => 1 should be(1))
    
          client
            .subscribeWith()
            .topicFilter("test/topic")
            .qos(MqttQos.EXACTLY_ONCE)
            .callback(x => println(new String(x.getPayloadAsBytes)))
            .send()
            .asScala
            .map(_ => 1 should be(1))
        }
      }
    }