Search code examples
apache-flinkflink-streaming

Custom Flink sink invoked but with no data


I am tying to implement a custom sink, where I created a stubbed invoke function that just logs received data to the task log file (for now) as shown below.

package io.name.package;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class AlertSink extends RichSinkFunction<Alert> {
    Logger LOG = LoggerFactory.getLogger(AlertSink.class);

    @Override
    public void invoke(Alert alert, Context context) throws Exception {
        LOG.info("Invoking sink for alert: ", alert.toString());
    }
}

I have configured data steam as shown below.

        DataStream<Alert> result = filteredMetrics
            .keyBy(
                new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
                    @Override
                    public  Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
                        return Tuple3.of(in.f0, in.f1, in.f2);
                    }
            })
            .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .process(new ThresholdEvaluator());

        result.addSink(new AlertSink());

When, i checked the logs, I see the sink was invoked but showing empty string. ThresholdEvaluator emits the Alert, but showing non-empty string.

2020-08-05 19:38:16,638 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,638 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-production-graph-service-account-toke644zm","period":"5m","isActive":true,"status":"new","firstSeen":1596656296638,"lastSeen":1596656296638,"count":1}
2020-08-05 19:38:16,640 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,640 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-staging-input-service-account-token-q57xf","period":"5m","isActive":true,"status":"new","firstSeen":1596656296640,"lastSeen":1596656296640,"count":1}
2020-08-05 19:38:16,643 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,643 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-restructure-repo-cmi-service-account-k76cd","period":"5m","isActive":true,"status":"new","firstSeen":1596656296643,"lastSeen":1596656296643,"count":1}
2020-08-05 19:38:16,646 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,646 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-integrations-14361530-demo-slack-token","period":"5m","isActive":true,"status":"new","firstSeen":1596656296645,"lastSeen":1596656296645,"count":1}

Am I missing something here?

I also tried adding map function in between ThresholdEvaluator and addSink operators. MapFunction seems able to receive Alert object fine but not AlertSink.

        result.map(new MapFunction<Alert, Alert>() {
            @Override
            public Alert map(Alert value) {
                LOG.info(value.toString());
                return value;
            }
        }).addSink(new AlertSink());

(updated with additional log)


Solution

  • The reason is that the log output does not have a placeholder to interpolate the value - the right syntax is

    LOG.info("Invoking sink for alert: {}", alert.toString());
    

    The difference between this and

    LOG.info("Invoking sink for alert: " + alert.toString());
    

    is that in a latter case the string concatenation will occur every time regardless of log level, and in first case it will interpolate the value if only the log level is at least INFO.