Search code examples
apache-flinkflink-streaminginfluxdbflink-sqlpyflink

How to sink message to InfluxDB using PyFlink?


I am trying to run PyFlink walkthough, but instead of sinking data to Elasticsearch, i want to use InfluxDB.
Note: the code in walkthough (link above) is working as expected.

In order for this to work, we need to put InfluxDB connector inside docker container. The other Flink connectors are placed inside container with these commands in Dockerfile:

# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/${FLINK_VERSION}/flink-sql-connector-kafka_2.12-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/${FLINK_VERSION}/flink-sql-connector-elasticsearch7_2.12-${FLINK_VERSION}.jar;

I need help in order to:

  • Put an InfluxDB connector into container
  • Modify the CREATE TABLE statement below, in order to work for InfluxDB
        CREATE TABLE es_sink (
            id VARCHAR,
            value DOUBLE
        ) with (
                'connector' = 'elasticsearch-7',
                'hosts' = 'http://elasticsearch:9200',
                'index' = 'platform_measurements_1',
                'format' = 'json'
            )

Solution

  • From the documentation:

    Table and SQL APIs currently (14/06/2022) does not support InfluxDB - a sql/table connector does not exist.

    Here are the known connectors that you can use:

    You can: