Search code examples
streamingapache-flinkflink-streaming

Flink SocketTextStream source scheduled to a single machine


I'm trying to figure out how it happens: I'm having a program reading from multiple socketTextStream and these text streams feed into different data flow (and these data streams never connect in my job). It looks something similar to below:

for(int i =0; i< hosts.length; i++) {

    DataStream<String> someStream = env.socketTextStream(hosts[i], ports[i]);
    DataStream<Tuple2<String, String>> joinedAdImpressions = rawMessageStream.rebalance() ...
}

However, when I run the job on a cluster I found that all source task have been scheduled to one machine so the machine becomes a severe bottleneck for the performance. Any ideas how would this happen?

Thanks!


Solution

  • The reason why all different SocketTextStreamFunction sources are scheduled to the same machine is because of slot sharing. Slot sharing allows Flink to schedule tasks belonging to different operators into the same slot. This allows, for example, to achieve better colocation between tasks which depend on each other (e.g. build-side, probe-side and actual join operator running in the same slot). Moreover, it makes it easier to reason about how many slots your application needs, which is the maximum parallelism of your job.

    However, the downside is that independent components of your job won't be spread across the cluster but usually end up in the same slot(s) (consequently on the same machine, too) due to slot sharing.

    You can disable slot sharing for parts of your job if you set explicitly a different slot sharing group name. Then only operators which are assigned to the same slot sharing group are subject to slot sharing. Down stream operators inherit the slot sharing group from their inputs. Thus, if you have an embarrassingly parallel job, then it suffices to only the set the slot sharing group at the sources.

    for(int i =0; i< hosts.length; i++) {
        DataStream<String> someStream = env
           .socketTextStream(hosts[i], ports[i])
           .slotSharingGroup("socket_" + i);
    
        DataStream<Tuple2<String, String>> joinedAdImpressions = rawMessageStream.rebalance() ...
    }