my aim is to collect real time data using ScheduledExecutorService
, scanning database every 5
seconds, capture any new data points and then proceed to do some calculations. The data arrives into database in uncertain times (ie 3 data points in 2 seconds or 5 data points in 2 seconds)
For each one data point that comes in, the calculation time require is 10 seconds. My question is: How do I update my stack of captured data at the same time not interfering with the calculation side. I also consider that:
ScheduledExecutorService
for my calculation part, as the calculation takes longer than the scanning counterpart.Is there any implementation that I can refer to? I ran out of idea, and I don't have sample but rather in pseudo format. I tried using while-loop horribly, which would then be killed via pid process, as excerpt:
//how to process real time data
while(true) {
if(calculationBegins) { //when first data point arrives
while(true) {
int oldDataSize = RealTimeData.size();
for(int i = 0; i < oldDataSize; i++) {
//
//complex calculation that takes 10 sec
//
}
//if new data arrived, break out and proceed calculation
while(true){
if(RealTimeData.size() > oldDataSize) break; //ScheduledExecutor will update the new size
}
}
}
}
Here is the full code (two files)
package test;
import static test.HikariCPDataSource.calculationBegins;
import static test.HikariCPDataSource.RealTimeData;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test {
static List<String> capture = new ArrayList<String>();
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
Runnable extract = () -> {
HikariCPDataSource.getData(args[0]);
};
service.scheduleAtFixedRate(extract, 0, 5, TimeUnit.SECONDS);
//how to process real time data
while(true) {
if(calculationBegins) {
while(true) {
int oldDataSize = RealTimeData.size();
for(int i = 0; i < oldDataSize; i++) {
//
//complex calculation that takes 10 sec
//
}
//if new data arrived, break out and proceed calculation
while(true){
if(RealTimeData.size() > oldDataSize) break;
}
}
}
}
}
}
package test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class HikariCPDataSource {
private static HikariConfig config = new HikariConfig();
private static HikariDataSource ds;
public static boolean calculationBegins = false; //true: start calculation when the very first data points arrives
public static List<String> RealTimeData = new ArrayList<>(); //capture real time data from database
public static int index = 0;
static {
config.setJdbcUrl("jdbc:mysql://127.0.0.1/testdb");
config.setUsername("root");
config.setPassword("caspo123");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setMaximumPoolSize(2);
config.setMinimumIdle(1);
config.setIdleTimeout(0);
config.setMaxLifetime(0);
ds = new HikariDataSource(config);
}
public static Connection getConnection() throws SQLException {
return ds.getConnection();
}
private HikariCPDataSource(){}
public static void setCalculationBegins() {
calculationBegins = true;
}
public static int getSizeHikariScout(){
return(RealTimeData.size());
}
public static int getIndex() {
return(index);
}
public static void resetIndex(int updateIndex) {
index = index + updateIndex;
}
public static void getData(String input1) {
Connection con = null;
PreparedStatement pstmt = null;
ResultSet resultSet = null;
String output = null;
try{
con = ds.getConnection();
pstmt = con.prepareStatement("SELECT column1, column2 FROM testdb where id = ? and index > ?");
pstmt.setString(1, input1);
pstmt.setInt(1, getIndex());
resultSet = pstmt.executeQuery();
int indexCount = 0;
while (resultSet.next()){
if(!calculationBegins) setCalculationBegins();
output = String.join(",", resultSet.getString("column1"), resultSet.getString("column2"));
RealTimeData.add(output);
indexCount = indexCount + 1;
}
resetIndex(indexCount);
} catch (SQLException e){
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
One solution I see here (and I actually done something similar myself) is to create two ExecutorService
s. One would be a ScheduledExecutorService
and the second a regular FixedThreadPool
Executor.
The first one would run periodically, lets say every 3 sec. It will just scan the DB for new data points and if it finds some, it will submit/queue the task to the other Executor.
The second Executor will be doing all the calculation and whatever else you want to do with the data. The submitted tasks will be queued for execution which will be done as soon as there is some idle thread.
This way all thread management and task queue handling is done by the Executors and you don't have to worry about it yourself.