My producer stops sending the message after 14116 messages have been sent. I have adjusted the number of nofile
from the default value to 1048576.
After around four five minutes, the producer starts to send the message again, but then it stops at 21880 messages again ...
I am so confused here, I don't know where the problem could be ... any idea guys?
Please refer to the code below for more details.
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Date;
import java.sql.Time;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.sql.Timestamp;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.UUID;
public class KafkaCreateData extends Thread {
public static final String topic = "web_access";
public static String bootstrap_servers = "xxxxxxxxxxxxx:9092";
public static String zookeeper_connect = "xxxxxxxxxxxxx:2181";
public static int msg_sent_count = 0;
public static int userId = 0;
public static void createData() {
Entity entity = new Entity();
Properties props = new Properties();
//EC2(Kafka producer IP here)
props.put("bootstrap.servers", bootstrap_servers);
props.put("zookeeper.connect", zookeeper_connect);
props.put("group.id", "metric-group");
props.put("batch.size", 32768);
props.put("buffer.memory", 67108864);
props.put("send.buffer.bytes", 67108864);
props.put("receive.buffer.bytes", -1);
// props.put("max.block.ms", 1);
// props.put("linger.ms", 1);
// props.put("request.timeout.ms", 1);
// props.put("delivery.timeout.ms", 5);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //Key serialization
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value serialization
props.put("request.required.acks", "0");
KafkaProducer producer = new KafkaProducer<String, String>(props);
//phone brand
String phoneArray[] = {"iPhone", "htc", "google", "xiaomi", "huawei"};
//os
String onlineArray[] = {"y", "n"};
//city
String cityArray[] = {"Taipei","Hong Kong","London","Paris","Tokyo","New York","Singapore","Rome"};
//Generate Brand dandomly
int k = (int) (Math.random() * 5);
String phoneName = phoneArray[k];
//Generate os randomly
int m = (int) (Math.random() * 2);
String online = onlineArray[m];
//Generate City randomly
int n = (int) (Math.random() * 8);
String city = cityArray[n];
//Event Time Stamp
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss");
Date date = new Date();
String loginTime = sf.format(new Timestamp(date.getTime()));
// String user_id = UUID.randomUUID().toString();
//Loading Data into Entity
entity.setCity(city);
entity.setLoginTime(loginTime);
entity.setOnline(online);
entity.setPhoneName(phoneName);
userId = userId + 1;
entity.setuserId(userId);
ProducerRecord record = new ProducerRecord<String, String>(topic,JSON.toJSONString(entity));
producer.send(record);
System.out.println("sending message:"+ JSON.toJSONString(entity));
msg_sent_count = msg_sent_count + 1;
System.out.println("msg_sent_count: " + msg_sent_count);
}
public static void flink_streaming_job() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap_servers);
props.put("zookeeper.connect", zookeeper_connect);
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
System.out.println("Before addSource");
env.addSource(
new FlinkKafkaConsumer011<>(
topic, new SimpleStringSchema(), props
)
// .setStartFromLatest()
)
// .setParallelism(9)
.map(string -> JSON.parseObject(string, Entity.class))
.addSink(new MysqlSink());
System.out.println("before execute");
env.execute("Flink add sink");
System.out.println("start to execute");
}
@Override
public void run() {
try {
createTheData();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void createTheData() throws InterruptedException {
while (true) {
createData();
Thread.sleep(1); // if this setup as 1, broker will be broken, setup as 500 to ensure it can works
}
}
public static void main(String[] args) throws Exception {
KafkaCreateData ConsumingMsgFromKafkaProducer = new KafkaCreateData();
ConsumingMsgFromKafkaProducer.start();
createData();
// Flink job on EMR
// flink_streaming_job();
}
}```
[1]: https://i.sstatic.net/7qVTW.png
[2]: https://i.sstatic.net/F59am.png
Sorry I can't comment yet, can you try following approaches: