I'm aggregating over a keyed stream using a SingleOutputStreamOperator
object in the form
stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...)
After I obtain the aggregates I would like to send each record to a REST endpoint using a POST request. To do this I've written a RichAsyncFunction
. Unfortunately AsyncDataStream.unorderedWait(...)
is incompatible with SingleOutputStreamOperator
and instead needs a more generic DataStreams
object.
The method unorderedWait(DataStream<IN>, AsyncFunction<IN,OUT>, long, TimeUnit, int) in the type AsyncDataStream is not applicable for the arguments (SingleOutputStreamOperator<Tuple6<String,String,Long,Long,Long,Long>>, AsyncFunction<String,String>, long, TimeUnit, int)
How can I use get the data stream from SingleOutputStreamOperator
to use the RichAsyncFunction
I've created. Or should I just use a process window function instead?
SingleOutputStreamOperator<T>
extends DataStream<T>
, so I don't think that's your problem.
It looks like you've defined an AsyncFunction<String,String>
, so it expects a String
as input, but based on the error message you're passing it Tuple6<String,String,Long,Long,Long,Long>
.