Search code examples
javaelasticsearchapache-kafkaapache-flinkamazon-kinesis-analytics

Windowing is not triggered when we deployed the Flink application into Kinesis Data Analytics


We have an Apache Flink POC application which works fine locally but after we deploy into Kinesis Data Analytics (KDA) it does not emit records into the sink.

Used technologies

Local

  • Source: Kafka 2.7
    • 1 broker
    • 1 topic with partition of 1 and replication factor 1
  • Processing: Flink 1.12.1
  • Sink: Managed ElasticSearch Service 7.9.1 (the same instance as in case of AWS)

AWS

  • Source: Amazon MSK Kafka 2.8
    • 3 brokers (but we are connecting to one)
    • 1 topic with partition of 1, replication factor 3
  • Processing: Amazon KDA Flink 1.11.1
    • Parallelism: 2
    • Parallelism per KPU: 2
  • Sink: Managed ElasticSearch Service 7.9.1

Application logic

  1. The FlinkKafkaConsumer reads messages in json format from the topic
  2. The jsons are mapped to domain objects, called Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);

    consumer.setStartFromEarliest(); //Just for repeatable testing

    return environment
            .addSource(consumer)
            .map(new MapJsonToTelemetry());
}
  1. The Telemetry’s timestamp is chosen for EventTimeStamp.
    3.1. With forMonotonousTimeStamps
  2. Telemetry’s StateIso is used for keyBy.
    4.1. The two letter iso code of the state of USA
  3. 5 seconds tumbling window strategy is applied
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
    WatermarkStrategy<Telemetry> wmStrategy =
            WatermarkStrategy
                    .<Telemetry>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.TimeStamp);

    return telemetries
            .assignTimestampsAndWatermarks(wmStrategy)
            .keyBy(t -> t.StateIso)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new WindowCountFunction());
}
  1. A custom ProcessWindowFunction is called to perform some basic aggregation.
    6.1. We calculate a single StateAggregatedTelemetry
  2. ElasticSearch is configured as sink.
    7.1. StateAggregatedTelemetry data are mapped into a HashMap and pushed into source.
    7.2. All setBulkFlushXYZ methods are set to low values
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));

    ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            (ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
                Map<String, Object> record = new HashMap<>();

                record.put("stateIso", element.StateIso);
                record.put("healthy", element.Flawless);
                record.put("unhealthy", element.Faulty);
                ...

                LOG.info("Telemetry has been added to the buffer");
                indexer.add(Requests.indexRequest()
                        .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                        .source(record, XContentType.JSON));
            }
    );

    //Using low values to make sure that the Flush will happen
    esSinkBuilder.setBulkFlushMaxActions(25);
    esSinkBuilder.setBulkFlushInterval(1000);
    esSinkBuilder.setBulkFlushMaxSizeMb(1);
    esSinkBuilder.setBulkFlushBackoff(true);
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {});

    LOG.info("Sink has been attached to the DataStream");
    telemetries.addSink(esSinkBuilder.build());
}

Excluded things

  • We managed to put Kafka, KDA and ElasticSearch under the same VPC and same subnets to avoid the need to sign each request
  • From the logs we could see that the Flink can reach the ES cluster.
    Request
{
    "locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
    "logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
    "message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}

Response

{
    "locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
    "logger": "org.elasticsearch.client.RestClient",
    "message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "DEBUG"
}
  • We could also verify that the messages had been read from the Kafka topic and sent for processing by looking at the Flink Dashboard Data is sent and received between tasks

What we have tried without luck

  • We had implemented a RichParallelSourceFunction which emits 1_000_000 messages and then exits
    • This worked well in the Local environment
    • The job finished in the AWS environment, but there was no data on the sink side
  • We had implemented an other RichParallelSourceFunction which emits 100 messages at each second
    • Basically we had two loops a while(true) outer and for inner
    • After the inner loop we called the Thread.sleep(1000)
    • This worked perfectly fine on the local environment
    • But in AWS we could see that checkpoints' size grow continuously and no message appeared in ELK
  • We have tried to run the KDA application with different parallelism settings
    • But there was no difference
  • We also tried to use different watermarking strategies (forBoundedOutOfOrderness, withIdle, noWatermarks)
    • But there was no difference
  • We have added logs for the ProcessWindowFunction and for the ElasticsearchSinkFunction
    • Whenever we run the application from IDEA then these logs were on the console
    • Whenever we run the application with KDA then there was no such logs in CloudWatch
      • Those logs that were added to the main they do appear in the CloudWatch logs

We suppose that we don't see data on the sink side because the window processing logic is not triggered. That's why don't see processing logs in the CloudWatch.

Any help would be more than welcome!


Update #1

  • We have tried to downgrade the Flink version from 1.12.1 to 1.11.1
    • There is no change
  • We have tried processing time window instead of event time
    • It did not even work on the local environment

Update #2

The average message size is around 4kb. Here is an excerpt of a sample message:

{
  "affiliateCode": "...",
  "appVersion": "1.1.14229",
  "clientId": "guid",
  "clientIpAddr": "...",
  "clientOriginated": true,
  "connectionType": "Cable/DSL",
  "countryCode": "US",
  "design": "...",
  "device": "...",
  ...
  "deviceSerialNumber": "...",
  "dma": "UNKNOWN",
  "eventSource": "...",
  "firstRunTimestamp": 1609091112818,
  "friendlyDeviceName": "Comcast",
  "fullDevice": "Comcast ...",
  "geoInfo": {
    "continent": {
      "code": "NA",
      "geoname_id": 120
    },
    "country": {
      "geoname_id": 123,
      "iso_code": "US"
    },
    "location": {
      "accuracy_radius": 100,
      "latitude": 37.751,
      "longitude": -97.822,
      "time_zone": "America/Chicago"
    },
    "registered_country": {
      "geoname_id": 123,
      "iso_code": "US"
    }
  },
  "height": 720,
  "httpUserAgent": "Mozilla/...",
  "isLoggedIn": true,
  "launchCount": 19,
  "model": "...",
  "os": "Comcast...",
  "osVersion": "...",
  ...
  "platformTenantCode": "...",
  "productCode": "...",
  "requestOrigin": "https://....com",
  "serverTimeUtc": 1617809474787,
  "serviceCode": "...",
  "serviceOriginated": false,
  "sessionId": "guid",
  "sessionSequence": 2,
  "subtype": "...",
  "tEventId": "...",
  ...
  "tRegion": "us-east-1",
  "timeZoneOffset": 5,
  "timestamp": 1617809473305,
  "traits": {
    "isp": "Comcast Cable",
    "organization": "..."
  },
  "type": "...",
  "userId": "guid",
  "version": "v1",
  "width": 1280,
  "xb3traceId": "guid"
}

We are using ObjectMapper to parse only just some of the fields of the json. Here is how the Telemetry class looks like:

public class Telemetry {
    public String AppVersion;
    public String CountryCode;
    public String ClientId;
    public String DeviceSerialNumber;
    public String EventSource;
    public String SessionId;
    public TelemetrySubTypes SubType; //enum
    public String TRegion;
    public Long TimeStamp;
    public TelemetryTypes Type; //enum
    public String StateIso;
    
    ...
}

Update #3

Source

Subtasks tab

ID Bytes received Records received Bytes sent Records sent Status
0 0 B 0 0 B 0 RUNNING
1 0 B 0 2.83 MB 15,000 RUNNING

Watermarks tab

No Data

Window

Subtasks tab

ID Bytes received Records received Bytes sent Records sent Status
0 1.80 MB 9,501 0 B 0 RUNNING
1 1.04 MB 5,499 0 B 0 RUNNING

Watermarks

SubTask Watermark
1 No Watermark
2 No Watermark

Solution

  • After having a support session with the AWS folks it turned out that we have missed to set the time characteristic on the streaming environment.

    • In 1.11.1 the default value of TimeCharacteristic was IngestionTime.
    • Since 1.12.1 (see related release notes) the default value is EventTime:

    In Flink 1.12 the default stream time characteristic has been changed to EventTime, thus you don’t need to call this method for enabling event-time support anymore.

    So, after we have set that EventTime explicitly then it started to generates watermarks like a charm:

    streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);