Search code examples
amazon-web-servicesmqtttelegrafinfluxdb-2aws-iot-core

telegraf: AWS IoT-Core to InfluxDB: i/o timeout


I have an existing (working) setup that uses InfluxDB's Native Subscriptions to transfer data from the AWS IoT-Core (as the MQTT broker) to our InfluxDB-Cloud instance. Native Subscriptions are being removed as a feature in InfluxDB - and while I can still use it at the moment there's no guarantee in the long run. The setup used port 8883

Solution: use InfluxDB's telegraf to achieve the same functionality. The data is sent in 'influx' line protocol already, so no additional parsing is necessary. My setup uses the telegraf:1.27.1 docker image to deploy telegraf in a container. I'm volume-mapping the certificates and telegraf-configuration into the container.

Here's the telegraf.conf file:

# Global settings
[agent]
  interval = "5s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = "1ms"
  debug = true
  quiet = false

[[inputs.mqtt_consumer]]
  servers = ["ssl://<AWS-ID>.iot.<LOCATION>.amazonaws.com:8883"]
  qos = 0
  connection_timeout = "30s"
  topics = ["374174.016/data"]
  client_id = "27-telegraf-e9cabc28-6a2b-4423-be74-31fb3dd231cd"
  data_format = "influx"

  # SSL configuration
  tls_ca = "/etc/telegraf/ca.pem"
  tls_cert = "/etc/telegraf/cert.pem"
  tls_key = "/etc/telegraf/key.pem"
  # insecure_skip_verify = false

[[outputs.influxdb_v2]]
  urls = ["https://<LOCATION>.aws.cloud2.influxdata.com/"]
  token = "[REDACTED]"
  organization = "[REDACTED]"
  bucket = "TELEGRAF_DEV"

Here are the startup (and failure) logs for telegraf:

2023-07-06T12:54:11Z I! Loading config: /etc/telegraf/telegraf.conf
2023-07-06T12:54:11Z I! Starting Telegraf 1.27.1
2023-07-06T12:54:11Z I! Available plugins: 237 inputs, 9 aggregators, 28 processors, 23 parsers, 59 outputs, 4 secret-stores
2023-07-06T12:54:11Z I! Loaded inputs: mqtt_consumer
2023-07-06T12:54:11Z I! Loaded aggregators: 
2023-07-06T12:54:11Z I! Loaded processors: 
2023-07-06T12:54:11Z I! Loaded secretstores: 
2023-07-06T12:54:11Z I! Loaded outputs: influxdb_v2
2023-07-06T12:54:11Z I! Tags enabled: host=d847c648021c
2023-07-06T12:54:11Z I! [agent] Config: Interval:5s, Quiet:false, Hostname:"d847c648021c", Flush Interval:10s
2023-07-06T12:54:11Z D! [agent] Initializing plugins
2023-07-06T12:54:11Z D! [agent] Connecting outputs
2023-07-06T12:54:11Z D! [agent] Attempting connection to [outputs.influxdb_v2]
2023-07-06T12:54:11Z D! [agent] Successfully connected to outputs.influxdb_v2
2023-07-06T12:54:11Z D! [agent] Starting service inputs
2023-07-06T12:55:11Z E! [telegraf] Error running agent: starting input inputs.mqtt_consumer: network Error : read tcp 172.17.0.2:51268->52.29.126.248:8443: i/o timeout

I've also written a Python-script to just read a couple of messages off the IoT-core from the same topic using the same certificate files (Python 3.9.6, awscrt==0.14.7, awsiot==0.1.3)

import json
import os.path
import time
import uuid
from dataclasses import dataclass
from typing import Callable, List, Any, Dict

from awscrt import io
from awscrt.mqtt import QoS
from awsiot import mqtt_connection_builder  # type: ignore


class AWSMQTTClient:

    def __init__(
            self,
            host_name: str,
            client_id: str,
            cert_dir: str = "."
    ):
        # write to files (compatibility with AWS library)
        self.cert_file = f"{cert_dir}/cert.pem"
        self.priv_file = f"{cert_dir}/key.pem"
        self.root_file = f"{cert_dir}/ca.pem"

        # check if files exist
        if not os.path.exists(self.cert_file):
            raise FileNotFoundError(f"cert file not found: {self.cert_file}")
        if not os.path.exists(self.priv_file):
            raise FileNotFoundError(f"priv file not found: {self.priv_file}")
        if not os.path.exists(self.root_file):
            raise FileNotFoundError(f"root file not found: {self.root_file}")

        # set up connection object
        event_loop_group = io.EventLoopGroup(1)
        host_resolver = io.DefaultHostResolver(event_loop_group)
        client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
        self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=host_name,
            cert_filepath=self.cert_file,
            pri_key_filepath=self.priv_file,
            client_bootstrap=client_bootstrap,
            ca_filepath=self.root_file,
            client_id=f"{client_id}_{uuid.uuid4()}",
            clean_session=False,
            keep_alive_secs=6,
        )

    def subscribe_to_topic(
            self,
            topic_name: str,
            duration: float,
            on_receive: Callable,
    ) -> None:
        _connect_future = self.mqtt_connection.connect()

        sub = self.mqtt_connection.subscribe(topic=topic_name, qos=QoS(1), callback=on_receive)
        time.sleep(duration)
        _connect_future.result()
        _disconnect_future = self.mqtt_connection.disconnect()
        _disconnect_future.result()
        if sub[0].running():
            raise ConnectionError("connection not closed")


@dataclass
class TopicInfo:
    topic: str
    message: str


@dataclass
class TopicResponse:
    topic_infos: List[TopicInfo]

    def add_topic(self, topic_info: TopicInfo):
        self.topic_infos.append(topic_info)

    def on_topic_received(
            self,
            topic: str,
            payload: bytes,
            dup: Any, qos: Any, retain: Any,
            **kwargs: Dict[Any, Any],
    ) -> None:
        try:
            message_json = json.loads(payload)
            if "message" in message_json.keys():
                message = message_json["message"]
            else:
                message = f"{message_json}"
        except json.decoder.JSONDecodeError:
            message = f"{payload.decode('ascii')}"
        topic_info = TopicInfo(
            topic=topic,
            message=message,
        )
        if topic_info not in self.topic_infos:
            self.add_topic(topic_info=topic_info)


if __name__ == '__main__':

    # settings
    _host_name = "<AWS ID>.iot.<LOCATION>.amazonaws.com"
    _client_id = f"{uuid.uuid4()}"
    _topic = "374174.016/data"

    aws_mqtt_client = AWSMQTTClient(
        host_name=_host_name,
        client_id=_client_id,
    )

    response = TopicResponse(topic_infos=[])
    aws_mqtt_client.subscribe_to_topic(
        topic_name=_topic,
        on_receive=response.on_topic_received,
        duration=7.0,
    )
    print("")
    print("----RECEIVED:----")
    print(response.topic_infos)
    print("-----------------")
    print("")
    assert response.topic_infos.__len__() != 0

The Python script works as expected - it reads (and logs) a couple of messages. From what I can tell it uses Port 443 though instead of 8883.

Here's the Makefile I'm using to deploy both telegraf and the Python script listen_to_mqtt.py via docker:

###########################
# Telegraf test setup     #
###########################

TELEGRAF_IMAGE=telegraf-test-image
TELEGRAF_CONTAINER=telegraf-test-container
TELEGRAF_LOCAL_DIR := $(shell pwd)
TELEGRAF_INTERNAL_CONFIG_DIR = /etc/telegraf

deploy_telegraf:
    docker run \
        --detach \
        --name $(TELEGRAF_CONTAINER) \
        -v $(TELEGRAF_LOCAL_DIR)/ca.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/ca.pem \
        -v $(TELEGRAF_LOCAL_DIR)/cert.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/cert.pem \
        -v $(TELEGRAF_LOCAL_DIR)/key.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/key.pem \
        -v $(TELEGRAF_LOCAL_DIR)/telegraf.conf:$(TELEGRAF_INTERNAL_CONFIG_DIR)/telegraf.conf:ro \
        telegraf:1.27.1

remove_telegraf:
    -docker rm -f $(TELEGRAF_CONTAINER)

watch_logs_telegraf:
    docker logs -f $(TELEGRAF_CONTAINER)

redeploy_telegraf:
    make remove_telegraf
    make deploy_telegraf
    make watch_logs_telegraf

access_container:
    docker exec -it $(TELEGRAF_CONTAINER) bash

DOCKER_NETWORK_INTERFACE = docker0  # replace this with your Docker network interface

analyze_traffic:
    sudo tcpdump -i $(DOCKER_NETWORK_INTERFACE) -U -w ./tcpdump.pcap   #'port 8883'

analyze_traffic_file:
    wireshark ./tcpdump.pcap

analyze_traffic_file_python:
    wireshark ./tcpdump-py.pcap

perform_connection_check:
    docker exec -it $(TELEGRAF_CONTAINER) ping a1q5qvt4b4ldkh-ats.iot.eu-central-1.amazonaws.com

###########################
# Python equivalent       #
###########################

TELEGRAF_PY_IMAGE=telegraf-test-image-py
TELEGRAF_PY_CONTAINER=telegraf-test-container-py
TELEGRAF_PY_DOCKERFILE=telegraf-py.Dockerfile

build_python:
    docker build -t $(TELEGRAF_PY_IMAGE) -f $(TELEGRAF_PY_DOCKERFILE) .

deploy_python:
    docker run \
        --detach --rm \
        --name $(TELEGRAF_PY_CONTAINER) \
        $(TELEGRAF_PY_IMAGE)

watch_logs_python:
    docker logs -f $(TELEGRAF_PY_CONTAINER)

remove_python:
    -docker rm -f $(TELEGRAF_PY_CONTAINER)

redeploy_python:
    make remove_python
    make build_python
    make deploy_python
    make watch_logs_python

And here's the Dockerfile for the Python script:

FROM python:3.9.6 as py3106
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /app

# copy requirement files
COPY ./requirements.txt ./requirements.txt

# install awscli
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
RUN unzip awscliv2.zip
RUN ./aws/install

# install general dependencies
RUN apt install gcc
RUN python -m pip install --upgrade pip && pip install --no-cache-dir -r requirements.txt

# copy script & certificates
COPY ./listen_to_mqtt.py ./listen_to_mqtt.py
COPY ./ca.pem ./ca.pem
COPY ./cert.pem ./cert.pem
COPY ./key.pem ./key.pem

# run the script
CMD python listen_to_mqtt.py

I used wireshark and tcpdump to analyze the network traffic from the container, but I'm admittedly a newbie doing that. When setting telegraf up with Port 8883 it fails within ~30s, and I don't see any key exchange and the [SYN] call doesn't receive a [SYN ACK]. When setting it up with Port 443 I see certificate exchanges and it takes around ~60s for the, essentially, same error message to appear: i/o timeout.

So things I tried are:

  • changing the telegraf version (for 1.18.3 the container doesn't crash, but it fails to receive messages, later versions crash after mentioned delay)
  • changing the port (443, 8883, 8443)
  • ping the AWS server from inside the telegraf-container before it crashes (works!)
  • settings insecure_skip_verify = false or insecure_skip_verify = true or leaving it out entirely
  • played around with the AWS IoT-Core's security policy (i.e. TLS13_1_3_2022_10, TLS12_1_0_2015_01, etc.) -- I see via wireshark that telegraf then uses TLS1.3 or TLS1.2, but apart from that no mayor change in behavior occurs

I'm expecting this to be about some tiny setting or configuration in either telegraf or AWS that I just don't know about. I'm not sure how I can proceed after spending close to a day on this already.


Solution

  • turns out the solution was quite simple, but a bit confusing due to the Python script: the outgoing Port was blocked in our network. The Python script (as mentioned) used Port 443 so it got through - Port 443 wouldn't work for telegraf though - and Port 8883 was blocked leading to the timeout error.