Search code examples
javaavrodebezium

How to get field from avro message created by Debezium?


I want to filter my messages based on their ts_ms time. The problem is I cannot get ts_ms from avro messages. This is my trimmed down avro .avsc file:

{
  "type": "record",
  "name": "Envelope",
  "namespace": "mysql.company.scores",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            <Some fields based on scores table>
          ],
          "connect.name": "mysql.company.scores.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "source",
      "type": {
        "type": "record",
        "name": "Source",
        "namespace": "io.debezium.connector.mysql",
        "fields": [
          {
            "name": "version",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "connector",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "server_id",
            "type": "long"
          },
          {
            "name": "ts_sec",
            "type": "long"
          },
          {
            "name": "gtid",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "file",
            "type": "string"
          },
          {
            "name": "pos",
            "type": "long"
          },
          {
            "name": "row",
            "type": "int"
          },
          {
            "name": "snapshot",
            "type": [
              {
                "type": "boolean",
                "connect.default": false
              },
              "null"
            ],
            "default": false
          },
          {
            "name": "thread",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "db",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "table",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "query",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ],
        "connect.name": "io.debezium.connector.mysql.Source"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ],
  "connect.name": "mysql.company.scores.Envelope"
}

I can access before or after, but when I can the following method with getTs_ms, I get symbol cannot be find method:

private boolean isRecordNew(mysql.company.scores.Envelope value){
        return value.getTs_ms() > 1580988600000L;
    }

This is the relevant part of my serde class:

public static Serde<mysql.company.scores.Envelope> getEnvelopeSerde() {
        SpecificAvroSerde<mysql.company.scores.Envelope> scoreSerde = new SpecificAvroSerde();
        scoreSerde.configure(
                Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                        schemaRegistryUrl), false);
        return scoreSerde;
    }

Shall I be able to access the ts_ms field with the same serde class or I should change it in order to have it included in the value?


Solution

  • As @cricket_007 mentioned in a comment I looked at the generated class and the field is named getTsMs() and by using this method it was solved.