Search code examples
phplaravelqueuemqtt

Running MQTT publish command in Laravel queue with database option fails but in sync it works


I am working on an app the has a background job that publishes command to an mqtt broker and when the queue is in sync it works but when it is database it only sends 1 out of 2 commands (separate dispatched jobs). No errors in laravel log or supervisor log.

Been googling various things all day and trying different things but nothing worked.

Here is my MQTT service class:

<?php

namespace App\Services;

use Illuminate\Contracts\Container\BindingResolutionException;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\ClientNotConnectedToBrokerException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\MqttClient;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Container\ContainerExceptionInterface;
use App\Models\Devices\Device;

class MqttService {

    public function sendCommand(Device $device, string $command, int $qos = 1)
    {
        $host = config('mqtt.host');
        $port = config('mqtt.port');
        $clientId = config('mqtt.client_id');


        $mqtt = new MqttClient($host, $port, $clientId);
        $mqtt->connect();

        $userId = $device->user->id;
        $deviceId = $device->id;
        $uniqueId = $device->unique_id;
        $topic = "v2/$userId/$deviceId/commands";

        if ($device->recognized_by_unique_id) {
            $topic = "v3/$uniqueId/commands";
        }

        $mqtt->publish($topic, $command, $qos);
        $mqtt->disconnect();
    }

    /**
     * Publishes error message for a specific device and user on the Mqtt broker.
     * 
     * @param string|null $userId the user id - first subtopic
     * @param string|null $deviceId the device id - last sub topic
     * @param string|null $uniqueId the device unique id - if it is using unique id instead of user id and device id
     * @param string|null $errorAsJson the json error message to publish on the mqtt broker
     * 
     * @return void 
     * 
     * @throws BindingResolutionException 
     * @throws NotFoundExceptionInterface 
     * @throws ContainerExceptionInterface 
     * @throws ConfigurationInvalidException 
     * @throws ConnectingToBrokerFailedException 
     * @throws ClientNotConnectedToBrokerException 
     * @throws RepositoryException 
     * @throws PendingMessageAlreadyExistsException 
     * @throws DataTransferException 
     */
    public static function sendError(?string $userId = null, ?string $deviceId = null, ?string $uniqueId = null, string $errorAsJson) 
    {
        $host = config('mqtt.host');
        $port = config('mqtt.port');
        $clientId = config('mqtt.client_id');

        if ($uniqueId) {
            $topic = "errors/$uniqueId";
        } else {
            $topic = "errors/$userId/$deviceId";
        }

        $mqtt = new MqttClient($host, $port, $clientId);
        $mqtt->connect();
        $mqtt->publish($topic, $errorAsJson, 0);
        $mqtt->disconnect();
    }

}

here is my job:

<?php

namespace App\Jobs;

use App\Services\CommandService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Models\Devices\Device;

class SendCommand implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(public Device $device, public string $command) {}

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        info("Before: {$this->device->unique_id}; {$this->command}");
        CommandService::sendCommands($this->device, $this->command);
        info("After: {$this->device->unique_id}; {$this->command}");
    }
}

Here is also the relevant part of the CommandService class:

public static function sendCommands(Device $device, string $command)
{
    $mqtt = new MqttService;

    $mqtt->sendCommand($device, $command);

    return true;
}

I added the logging calls, it logs two of each the only oddity is that it logs first two "Before:" etc and then two "After:" etc. though I when I think about it that should be ok as jobs are running in parallel.

In terms of which device's job is dispatched it seems to be random sometimes device 1 sometimes device 2. I had cases where both are dispatched but that is rarer.

I am lost, any ideas?


Solution

  • As per the comments this is likely to be a combination of things:

    • Concurrent connections with the same client ID (the broker will drop existing connections with the same ID)
    • Using QOS 1 but ignoring the ACK (you just disconnect). You may as well use QOS 0 as messages will never be retransmitted with your current setup.
    • Sending multiple messages with the same Packet ID/client ID (the packet ID should not be re-used before an ACK is processed). Whether this is an issue will depend on the broker (client initiated, unacknowledged, QOS 1 messages are not part of the server session state, but brokers may handle this in different ways).

    This issue contains a detailed response to someone experiencing a similar problem. Potential solutions include:

    • Using different, or no, Client ID.
    • Waiting for the PUBACK (example here).
    • Using QOS 0; these are fire and forget with no delivery guarantee (but as you don't wait for a PUBACK currently so have no guarantees).
    • Establishing a long running connection with the broker (does not really fit with normal PHP flow; questions like this may help).