I have an a java tcp socket application. Tcp communicator decodes the GPS location and inserts that data into the database and before inserting we do some selects and updates but all I do it using prepared statements. Right now, one thread of TCP communicator serves one device request.Immediately after creating the thread we get one connection from the pool. After decoding the GPS data is where we perform the multiple select, update and insert for each data. As number of devices are increasing, the number of concurrent connections to our Mysql database are also increasing.So I am trying to conduct a simulation and stress test something like below. The issue is that this is sequential test but in real environment the devices will be coming in parallel. How to achieve a near real stress situation for both mysql and java to find out how many inserts could mysql take in on second?
public class stress1 extends Thread {
public static void main(String[] argv) {
try {
for (int i = 0; i < 5000; i++) {
Socket socket = new Socket("192.168.2.102", 8000);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
out.println("$A12345,30061104075130528955N10024852E000068*03A1*");
System.out.println(in.readLine() + i);
out.close();
in.close();
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Here is how my server socket looks like.
public class comm8888 {
HikariDataSource connectionPool = null;
private Socket receivedSocketConn1;
ConnectionHandler(Socket receivedSocketConn1) {
this.receivedSocketConn1=receivedSocketConn1;
}
Connection dbconn = null;
public void run() { // etc
DataOutputStream w = null;
DataInputStream r = null;
String message="";
receivedSocketConn1.setSoTimeout(60000);
dbconn = connectionPool.getConnection();
dbconn.setAutoCommit(false);
try {
w = new DataOutputStream(new BufferedOutputStream(receivedSocketConn1.getOutputStream()));
r = new DataInputStream(new BufferedInputStream(receivedSocketConn1.getInputStream()));
while ((m=r.read()) != -1){
//multiple prepared based sql select,update and insert here.
}
}
finally{
try {
if ( dbconn != null ) {
dbconn.close();
}
}
catch(SQLException ex){
ex.printStackTrace();
}
try{
if ( w != null ){
w.close();
r.close();
receivedSocketConn1.close();
}
}
catch(IOException ex){
ex.printStackTrace(System.out);
}
}
}
}
public static void main(String[] args) {
new comm8888();
}
comm8888() {
try {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/testdata");
config.setUsername("****");
config.setPassword("****");
config.setMaximumPoolSize(20);
connectionPool = new HikariDataSource(config); // setup the connection pool
}
catch (Exception e) {
e.printStackTrace(System.out);
}
try
{
final ServerSocket serverSocketConn = new ServerSocket(8888);
while (true){
try {
Socket socketConn1 = serverSocketConn.accept();
new Thread(new ConnectionHandler(socketConn1)).start();
}
catch(Exception e){
e.printStackTrace(System.out);
}
}
}
catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
Your solution doesn't scale well. I would encapsulate the work to be done (GPS location of a device) in objects of some class, then put each unit of work in a queue. Finally, one thread can process all the work sequentially, processing one request at a time.
If one thread can't keep up and your queue becomes full, then it is very easy to scale by adding more workers to process jobs from the queue. (or if one instance of MySQL can't handle all the inserts, you can also try for example horizontal sharding of your data and adding multiple MySQL instances).
this is some sample code:
import java.sql.Connection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class QueueingDatabaseProcessingExample {
public static void main(String[] args) throws InterruptedException {
QueueingDatabaseProcessingExample a = new QueueingDatabaseProcessingExample();
a.doTheWork();
}
private void doTheWork() throws InterruptedException {
BlockingQueue<TcpCommunicatorUnitOfWork> blockingQueue = new ArrayBlockingQueue(1000);
// add work to queue as needed
blockingQueue.put(new TcpCommunicatorUnitOfWork("device id", 40.7143528, -74.0059731, 10)); // blocks if queue is full
Connection connection;
// get connection to the database from database pool
// process requests one by one sequentially
while (true) {
TcpCommunicatorUnitOfWork tcpCommunicatorUnitOfWork = blockingQueue.take(); // blocks if queue is empty
proccess(tcpCommunicatorUnitOfWork);
}
}
private void proccess(TcpCommunicatorUnitOfWork tcpCommunicatorUnitOfWork) {
// do queries, inserts, deletes to database
}
}
/**
* this class should have all the information needed to query/update the database
*/
class TcpCommunicatorUnitOfWork {
private final String deviceId;
private final double latitude;
private final double longitude;
private final int seaLevel;
public TcpCommunicatorUnitOfWork(String deviceId, double latitude, double longitude, int seaLevel) {
this.deviceId = deviceId;
this.latitude = latitude;
this.longitude = longitude;
this.seaLevel = seaLevel;
}
}