Search code examples
javaproducer-consumerjava.util.concurrentthread-sleepemail-threading

java multi threaded application: thread.sleep(100) takes almost forever


I'm trying to fix an application, which sends out a number of emails to clients. The sending is done by a multi threaded java app (on the Producer-Consumer model), where the Producer calls down the list of messages from the database, and the Consumers call a Python script to send mail.

From one day to another, my boss told me, that the program stopped working - I don't know what happened, maybe something changed, but the sysadmins keep saying nothing have happened, the program was working yesterday, and then suddenly slowed down... and now it only sends 2-3 messages per hour, instead of the previous number of 1000.

The developer is no longer available, so I have to fix it myself.

I cleared the database, it only contained data about "the past", it was not needed, and was huge... 6GB of data, 7M rows, it was extremely slow. But the problem remained.

Now I'm logging on screen with the java app, and I see that these lines:

try
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
      Thread.sleep(consumerSleepTime);
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
    }
    catch (InterruptedException e)
    {
      e.printStackTrace();
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
      Thread.currentThread().interrupt();
      break main_while_loop;
    }

"stay" in sleep, and I see:

11:24:15:: consumer_thread_1 is sleeping for 100 ms ::

Nothing else happens. No "sleeptime over", no stack data, looks like the thread sleeping takes forever... However, if I keep the process running, few times, after random time, the threads continue, then sleep again... for hours.

Any ideas?

Here are a few files, maybe they're needed to understand the whole structure...

runjava

#!/bin/sh
java -cp /usr/share/java/mysql-connector-java.jar:. newthread

newthread.java

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.sql.SQLException;
import java.sql.Driver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.io.Closeable;
import java.io.IOException;


class ReentrantLockTest
{
  int maxSize = 100;
  ArrayList boundedBuffer = new ArrayList( maxSize );

  ReentrantLock lock = new ReentrantLock(false);
  Condition telire_var = lock.newCondition(),
            uresre_var = lock.newCondition();

  volatile boolean shutdown = false;
  long producerSleepTime = 5000;
  long consumerSleepTime = 100;

  long mainDriverWaitTime = 1000;

  int totalProduerThreads = 1;

  int totalConsumerThreads = 2;
  long criticalSectionDelay = 0;
  int currentIndex = -1;
  String sqlhost = "localhost";
  String sqluser = "user";
  String sqlpassword = "pass";
  String sqldb = "db";
  String mysqlUrl = "";

  public ReentrantLockTest()
  {
    System.out.println(sqluser);
    java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);

    ArrayList<Thread> threads = new ArrayList<Thread>();
    ThreadGroup tg = new ThreadGroup("ReentrantLockTest Thread Group");
    // get sql
    mysqlUrl = "jdbc:mysql://"+ sqlhost +"/"+ sqldb +"?autoReconnect=true";

    try
    {
      Class.forName("com.mysql.jdbc.Driver").newInstance();
    }
    catch (Exception e)
    {
      System.out.println(e);
      return;
    }

    for (int i = 1; i<=totalProduerThreads; i++)
    {
      threads.add( new Thread( tg , new ReentrantLockTest.Producer() , "producer_thread_"+i) );
    }
    for (int i = 1; i<=totalConsumerThreads; i++)
    {
      threads.add( new Thread( tg , new ReentrantLockTest.Consumer() , "consumer_thread_"+i) );
    }
    for (Thread t: threads)
      t.start();

    try
    {
      while (true)
      {
        for (Thread t: threads)
          t.resume();
          Thread.sleep(mainDriverWaitTime);
      }
    }
    catch (InterruptedException e)
    {
      e.printStackTrace();
    }
    finally
    {
      shutdown = true;
      System.out.println(shortTime.format(new java.util.Date()) + ":: ReentrantLockTest - setting shutdown to false ::");
    }

    System.out.println(":: ReentrantLockTest - signalling interrupt and waiting for "+tg.activeCount()+" threads to die ::");

    for (Thread t: threads)
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: Interrupting "+t.getName()+" ::");
      try
      {
        t.interrupt();
      }
      catch (Exception e)
      {
        e.printStackTrace();
      }
    }
    for (Thread t: threads)
    {
      try
      {
        StringBuilder sb = new StringBuilder( t.getName() );
        System.out.println(shortTime.format(new java.util.Date()) + ":: Waiting for "+sb+" to die ::");
        t.join();
        System.out.println(shortTime.format(new java.util.Date()) + ":: "+sb+" is dead ::" );
      }
      catch (InterruptedException e)
      {
        e.printStackTrace();
      }
    }
  }

  public class Consumer implements Runnable
  {
    java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
    private void close(Closeable c)
    {
      if (c != null)
      {
        try
        {
          c.close();
        }
        catch (IOException e)
        {
          // ignored
        }
      }
    }

    public void run()
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
      main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
      {
        try
        {
          lock.lock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");
          Thread.sleep(criticalSectionDelay);

          if (currentIndex == -1)
          {
            uresre_var.signal();
            System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_full_buffer ::");
          }

          while(currentIndex == -1)
          {
            System.out.println(shortTime.format(new java.util.Date()) + ":: buffer is empty - "+Thread.currentThread().getName()+" is going to wait ::");
            telire_var.await();
          }//while condition waiting_on_empty_buffer

          // sendmail
          String mail_id = boundedBuffer.remove(currentIndex).toString();
          currentIndex--;
          Process proc = null;

          System.out.println(shortTime.format(new java.util.Date()) + ":: buffer size: "+ currentIndex);

          lock.unlock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");

          try
          {
            System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" starts python ("+ mail_id +")");

        //this randomwaiting on sending is not needed
        /*Random generator = new Random();
            int randomWait = generator.nextInt(1000) + 100;
            Thread.sleep(randomWait);
            System.out.println(" => "+ randomWait);*/

            proc = Runtime.getRuntime().exec("python ./new_mail.py "+ mail_id);
            proc.waitFor();
            System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" ends python");
          }
          catch (Exception e)
          {
            e.printStackTrace();
          }
          finally
          {
            if (proc != null)
            {
              close(proc.getOutputStream());
              close(proc.getInputStream());
              close(proc.getErrorStream());
              proc.destroy();
              System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" close python proc");
            }
          }
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
        try
        {
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
          Thread.sleep(consumerSleepTime);
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
        System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over ::");
      }//end while: main_while_loop

      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
    }//end run
  }

  public class Producer implements Runnable
  {
    java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
    public void run()
    {
      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
      main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
      {
        try
        {
          lock.lock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");

          while (currentIndex != -1)
          {
            System.out.println(shortTime.format(new java.util.Date()) + ":: a bufferben van valamennyi - "+Thread.currentThread().getName()+" varakozik ::");
            uresre_var.await();
          }//while condition waiting_on_full_buffer

          Thread.sleep(criticalSectionDelay);

          Connection con = null;
          Statement st = null;
          ResultSet rs = null;
          try
          {
            con = DriverManager.getConnection(mysqlUrl, sqluser, sqlpassword);
            st = con.createStatement();
            rs = st.executeQuery("SELECT mail_id FROM mail_queue WHERE status = 0 OR (status BETWEEN 2 AND 100 AND proof_counter <= 3 AND UNIX_TIMESTAMP() - proof_stamp > 15*60 ) ORDER BY status, mail_id DESC LIMIT "+ maxSize);

            boundedBuffer = new ArrayList(maxSize);
            currentIndex = -1;

            while(rs.next())
            {
              String mail_id = Integer.toString(rs.getInt(1));
              boundedBuffer.add(mail_id);
              currentIndex++;
            }
            con.close();
          }
          catch (Exception err)
          {
            System.out.println(err);
            break;
          }

          telire_var.signal();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_empty_buffer ::");
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
        finally
        {
          lock.unlock();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");
        }

        try
        {
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+producerSleepTime+" ms ::");
          Thread.sleep(producerSleepTime);
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
          System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
          Thread.currentThread().interrupt();
          break main_while_loop;
        }
      }

      System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
    }
  }
}

class newthread
{
  public static void main(String[] args)
  {
    new ReentrantLockTest();
  }
}

Any help is appreciated!!

Edit:

Here is a thread dump:

2012-07-05 12:15:06
Full thread dump OpenJDK 64-Bit Server VM (14.0-b16 mixed mode):

"SIGHUP handler" daemon prio=10 tid=0x00007f7bb0001000 nid=0x838 runnable [0x00007f7beb205000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Terminator$1.handle(Terminator.java:52)
        at sun.misc.Signal$1.run(Signal.java:212)
        at java.lang.Thread.run(Thread.java:636)

"consumer_thread_2" prio=10 tid=0x00007f7d940f4800 nid=0x68c runnable [0x00007f7beb51b000]
   java.lang.Thread.State: RUNNABLE
        at java.io.PrintStream.write(PrintStream.java:446)
        - locked <0x00007f7d137703b0> (a java.io.PrintStream)
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:220)
        at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:290)
        at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:103)
        - locked <0x00007f7d137704a8> (a java.io.OutputStreamWriter)
        at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
        at java.io.PrintStream.write(PrintStream.java:494)
        - locked <0x00007f7d137703b0> (a java.io.PrintStream)
        at java.io.PrintStream.print(PrintStream.java:636)
        at java.io.PrintStream.println(PrintStream.java:773)
        - locked <0x00007f7d137703b0> (a java.io.PrintStream)
        at ReentrantLockTest$Consumer.run(newthread.java:151)
        at java.lang.Thread.run(Thread.java:636)

"consumer_thread_1" prio=10 tid=0x00007f7d940f4000 nid=0x68b waiting on condition [0x00007f7beb61c000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f7d137acee0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:871)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
        at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
        at ReentrantLockTest$Consumer.run(newthread.java:150)
        at java.lang.Thread.run(Thread.java:636)

"producer_thread_1" prio=10 tid=0x00007f7d940e5000 nid=0x68a waiting on condition [0x00007f7beb71d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f7d137acf38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at ReentrantLockTest$Producer.run(newthread.java:250)
        at java.lang.Thread.run(Thread.java:636)

"Low Memory Detector" daemon prio=10 tid=0x00007f7d940af800 nid=0x688 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread1" daemon prio=10 tid=0x00007f7d940ac800 nid=0x687 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread0" daemon prio=10 tid=0x00007f7d940aa800 nid=0x686 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00007f7d940a8800 nid=0x685 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x00007f7d94087800 nid=0x683 in Object.wait() [0x00007f7bebd36000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:133)
        - locked <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:149)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177)

"Reference Handler" daemon prio=10 tid=0x00007f7d94085800 nid=0x682 in Object.wait() [0x00007f7bebe37000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
        - locked <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)

"main" prio=10 tid=0x00007f7d94007000 nid=0x673 sleeping[0x00007f7d99285000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at ReentrantLockTest.<init>(newthread.java:81)
        at newthread.main(newthread.java:320)

"VM Thread" prio=10 tid=0x00007f7d94080800 nid=0x681 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f7d94011800 nid=0x674 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f7d94013000 nid=0x675 runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f7d94015000 nid=0x676 runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f7d94017000 nid=0x677 runnable

"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f7d94018800 nid=0x678 runnable

"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f7d9401a800 nid=0x679 runnable

"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f7d9401c800 nid=0x67a runnable

"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f7d9401e000 nid=0x67b runnable

"GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f7d94020000 nid=0x67c runnable

"GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f7d94022000 nid=0x67d runnable

"GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f7d94023800 nid=0x67e runnable

"GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f7d94025800 nid=0x67f runnable

"GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f7d94027800 nid=0x680 runnable

"VM Periodic Task Thread" prio=10 tid=0x00007f7d940b2000 nid=0x689 waiting on condition

JNI global references: 1000

Heap
12:15:06:: consumer_thread_2 acquired the lock :: PSYoungGen      total 112448K, used 17376K [0x00007f7d13760000, 0x00007f7d1b4e0000, 0x00007f7d91000000)
  eden space 96384K, 18% used [0x00007f7d13760000,0x00007f7d148581c0,0x00007f7d19580000)
  from space 16064K, 0% used [0x00007f7d1a530000,0x00007f7d1a530000,0x00007f7d1b4e0000)
  to   space 16064K, 0% used
 [0x00007f7d19580000,0x00007f7d19580000,0x00007f7d1a530000)
 PSOldGen        total 257152K, used 0K [0x00007f7c18600000, 0x00007f7c28120000, 0x00007f7d13760000)
  object space 257152K, 0% used [0x00007f7c18600000,0x00007f7c18600000,0x00007f7c28120000)
 PSPermGen       total 21248K, used 7359K [0x00007f7c0de00000, 0x00007f7c0f2c0000, 0x00007f7c18600000)
  object space 21248K, 34% used [0x00007f7c0de00000,0x00007f7c0e52fe38,0x00007f7c0f2c0000)

Solution

  • (copied from comments)

    Did this behaviour start over last weekend? Could this have something to do with the leap second issue a number of people have had? (Symptom: very high CPU usage of Java process which possibly might cause thread sleeps to take much longer than normal)


    If this is indeed the issue, one solution is to simply reboot the system. Another solution is to run:

    date -s "`date`"
    

    as suggested by this article.