I'm trying to sink two Window Streams to the same Kinesis Sink. When I do this, no results are making it to the sink (code below). If I remove one of the windows from the Job, results do get published. Adding another stream to the sink seems to void both.
How can I have results from both Window Streams go to the same sink?
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonParser = new ObjectMapper();
DataStream<String> inputStream = createKinesisSource(env);
FlinkKinesisProducer<String> kinesisSink = createKinesisSink();
WindowedStream oneMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)));
oneMinStream
.aggregate(new LoginAggregator("k1m"))
.addSink(kinesisSink);
WindowedStream twoMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(2)));
twoMinStream
.aggregate(new LoginAggregator("k2m"))
.addSink(kinesisSink);
try {
env.execute("Flink Kinesis Streaming Sink Job");
} catch (Exception e) {
LOG.error("failed");
LOG.error(e.getLocalizedMessage());
LOG.error(e.getStackTrace().toString());
throw e;
}
}
private static DataStream<String>
createKinesisSource(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
new SimpleStringSchema(), inputProperties));
}
private static FlinkKinesisProducer<String> createKinesisSink() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition(UUID.randomUUID().toString());
return sink;
}
You want to .union()
the oneMinStream
and twoMinStream
together, and then add your sink to that unioned stream.